[SPARK-16967] move mesos to module

## What changes were proposed in this pull request?

Move Mesos code into a mvn module

## How was this patch tested?

unit tests
manually submitting a client mode and cluster mode job
spark/mesos integration test suite

Author: Michael Gummelt <[email protected]>

Closes #14637 from mgummelt/mesos-module.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e5475be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e5475be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e5475be

Branch: refs/heads/master
Commit: 8e5475be3c9a620f18f6712631b093464a7d0ee7
Parents: c0949dc
Author: Michael Gummelt <[email protected]>
Authored: Fri Aug 26 12:25:22 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Aug 26 12:25:22 2016 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 assembly/pom.xml                                |  10 +
 core/pom.xml                                    |   5 -
 .../scala/org/apache/spark/SparkContext.scala   |  18 +-
 .../main/scala/org/apache/spark/TaskState.scala |  20 -
 .../deploy/mesos/MesosClusterDispatcher.scala   | 115 ---
 .../mesos/MesosClusterDispatcherArguments.scala | 109 ---
 .../deploy/mesos/MesosDriverDescription.scala   |  70 --
 .../mesos/MesosExternalShuffleService.scala     | 130 ----
 .../spark/deploy/mesos/ui/DriverPage.scala      | 179 -----
 .../deploy/mesos/ui/MesosClusterPage.scala      | 135 ----
 .../spark/deploy/mesos/ui/MesosClusterUI.scala  |  49 --
 .../deploy/rest/mesos/MesosRestServer.scala     | 157 ----
 .../spark/executor/MesosExecutorBackend.scala   | 130 ----
 .../mesos/MesosClusterPersistenceEngine.scala   | 134 ----
 .../cluster/mesos/MesosClusterScheduler.scala   | 745 -------------------
 .../mesos/MesosClusterSchedulerSource.scala     |  40 -
 .../MesosCoarseGrainedSchedulerBackend.scala    | 642 ----------------
 .../MesosFineGrainedSchedulerBackend.scala      | 451 -----------
 .../mesos/MesosSchedulerBackendUtil.scala       | 165 ----
 .../cluster/mesos/MesosSchedulerUtils.scala     | 494 ------------
 .../cluster/mesos/MesosTaskLaunchData.scala     |  51 --
 .../SparkContextSchedulerCreationSuite.scala    |  28 -
 .../mesos/MesosClusterSchedulerSuite.scala      | 213 ------
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 517 -------------
 .../MesosFineGrainedSchedulerBackendSuite.scala | 385 ----------
 .../mesos/MesosSchedulerUtilsSuite.scala        | 255 -------
 .../mesos/MesosTaskLaunchDataSuite.scala        |  36 -
 .../spark/scheduler/cluster/mesos/Utils.scala   |  85 ---
 dev/create-release/release-build.sh             |  15 +-
 dev/lint-java                                   |   2 +-
 dev/mima                                        |   2 +-
 dev/scalastyle                                  |   1 +
 dev/sparktestsupport/modules.py                 |   7 +
 dev/test-dependencies.sh                        |   2 +-
 docs/building-spark.md                          |  24 +-
 mesos/pom.xml                                   | 109 +++
 ...pache.spark.scheduler.ExternalClusterManager |   1 +
 .../deploy/mesos/MesosClusterDispatcher.scala   | 115 +++
 .../mesos/MesosClusterDispatcherArguments.scala | 109 +++
 .../deploy/mesos/MesosDriverDescription.scala   |  70 ++
 .../mesos/MesosExternalShuffleService.scala     | 130 ++++
 .../spark/deploy/mesos/ui/DriverPage.scala      | 179 +++++
 .../deploy/mesos/ui/MesosClusterPage.scala      | 135 ++++
 .../spark/deploy/mesos/ui/MesosClusterUI.scala  |  49 ++
 .../deploy/rest/mesos/MesosRestServer.scala     | 157 ++++
 .../spark/executor/MesosExecutorBackend.scala   | 131 ++++
 .../cluster/mesos/MesosClusterManager.scala     |  60 ++
 .../mesos/MesosClusterPersistenceEngine.scala   | 134 ++++
 .../cluster/mesos/MesosClusterScheduler.scala   | 745 +++++++++++++++++++
 .../mesos/MesosClusterSchedulerSource.scala     |  40 +
 .../MesosCoarseGrainedSchedulerBackend.scala    | 642 ++++++++++++++++
 .../MesosFineGrainedSchedulerBackend.scala      | 451 +++++++++++
 .../mesos/MesosSchedulerBackendUtil.scala       | 165 ++++
 .../cluster/mesos/MesosSchedulerUtils.scala     | 514 +++++++++++++
 .../cluster/mesos/MesosTaskLaunchData.scala     |  51 ++
 .../mesos/MesosClusterManagerSuite.scala        |  47 ++
 .../mesos/MesosClusterSchedulerSuite.scala      | 213 ++++++
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 517 +++++++++++++
 .../MesosFineGrainedSchedulerBackendSuite.scala | 385 ++++++++++
 .../mesos/MesosSchedulerUtilsSuite.scala        | 255 +++++++
 .../mesos/MesosTaskLaunchDataSuite.scala        |  36 +
 .../spark/scheduler/cluster/mesos/Utils.scala   |  85 +++
 pom.xml                                         |  21 +-
 project/MimaExcludes.scala                      |   4 +-
 project/SparkBuild.scala                        |   4 +-
 66 files changed, 5582 insertions(+), 5395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index c16f763..8739849 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,7 @@ notifications:
 # 5. Run maven install before running lint-java.
 install:
   - export MAVEN_SKIP_RC=1
-  - build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive 
-Phive-thriftserver install
+  - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl 
-Phive -Phive-thriftserver install
 
 # 6. Run lint-java.
 script:

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 971a62f..ec243ea 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -139,6 +139,16 @@
       </dependencies>
     </profile>
     <profile>
+      <id>mesos</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-mesos_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>hive</id>
       <dependencies>
         <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index ab6c3ce..c04cf7e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -216,11 +216,6 @@
       <artifactId>jersey-container-servlet-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.mesos</groupId>
-      <artifactId>mesos</artifactId>
-      <classifier>${mesos.classifier}</classifier>
-    </dependency>
-    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2eaeab1..08d6343 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, 
InputFormat, JobConf, Sequence
   TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
-import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
@@ -56,7 +55,6 @@ import org.apache.spark.rdd._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
StandaloneSchedulerBackend}
-import 
org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, 
MesosFineGrainedSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
@@ -2512,18 +2510,6 @@ object SparkContext extends Logging {
         }
         (backend, scheduler)
 
-      case MESOS_REGEX(mesosUrl) =>
-        MesosNativeLibrary.load()
-        val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", 
defaultValue = true)
-        val backend = if (coarseGrained) {
-          new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, 
sc.env.securityManager)
-        } else {
-          new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
-        }
-        scheduler.initialize(backend)
-        (backend, scheduler)
-
       case masterUrl =>
         val cm = getClusterManager(masterUrl) match {
           case Some(clusterMgr) => clusterMgr
@@ -2545,7 +2531,7 @@ object SparkContext extends Logging {
   private def getClusterManager(url: String): Option[ExternalClusterManager] = 
{
     val loader = Utils.getContextOrSparkClassLoader
     val serviceLoaders =
-    ServiceLoader.load(classOf[ExternalClusterManager], 
loader).asScala.filter(_.canCreate(url))
+      ServiceLoader.load(classOf[ExternalClusterManager], 
loader).asScala.filter(_.canCreate(url))
     if (serviceLoaders.size > 1) {
       throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) 
registered " +
           s"for the url $url:")
@@ -2566,8 +2552,6 @@ private object SparkMasterRegex {
   val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
   // Regular expression for connecting to Spark deploy clusters
   val SPARK_REGEX = """spark://(.*)""".r
-  // Regular expression for connection to Mesos cluster by mesos:// or 
mesos://zk:// url
-  val MESOS_REGEX = """mesos://(.*)""".r
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/TaskState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala 
b/core/src/main/scala/org/apache/spark/TaskState.scala
index d232fae..cbace7b 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
 private[spark] object TaskState extends Enumeration {
 
   val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
@@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration {
   def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == 
state)
 
   def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)
-
-  def toMesos(state: TaskState): MesosTaskState = state match {
-    case LAUNCHING => MesosTaskState.TASK_STARTING
-    case RUNNING => MesosTaskState.TASK_RUNNING
-    case FINISHED => MesosTaskState.TASK_FINISHED
-    case FAILED => MesosTaskState.TASK_FAILED
-    case KILLED => MesosTaskState.TASK_KILLED
-    case LOST => MesosTaskState.TASK_LOST
-  }
-
-  def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => 
LAUNCHING
-    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
-    case MesosTaskState.TASK_FINISHED => FINISHED
-    case MesosTaskState.TASK_FAILED => FAILED
-    case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
deleted file mode 100644
index 73b6ca3..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos
-
-import java.util.concurrent.CountDownLatch
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.mesos.ui.MesosClusterUI
-import org.apache.spark.deploy.rest.mesos.MesosRestServer
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos._
-import org.apache.spark.util.{ShutdownHookManager, Utils}
-
-/*
- * A dispatcher that is responsible for managing and launching drivers, and is 
intended to be
- * used for Mesos cluster mode. The dispatcher is a long-running process 
started by the user in
- * the cluster independently of Spark applications.
- * It contains a [[MesosRestServer]] that listens for requests to submit 
drivers and a
- * [[MesosClusterScheduler]] that processes these requests by negotiating with 
the Mesos master
- * for resources.
- *
- * A typical new driver lifecycle is the following:
- * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
- * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
- * - [[MesosClusterScheduler]] gets resource offers and launches the drivers 
that are in queue
- *
- * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the 
mode is configurable
- * per driver launched.
- * This class is needed since Mesos doesn't manage frameworks, so the 
dispatcher acts as
- * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher 
is also started and
- * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher 
respectively.
- */
-private[mesos] class MesosClusterDispatcher(
-    args: MesosClusterDispatcherArguments,
-    conf: SparkConf)
-  extends Logging {
-
-  private val publicAddress = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
-  private val recoveryMode = conf.get("spark.deploy.recoveryMode", 
"NONE").toUpperCase()
-  logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
-
-  private val engineFactory = recoveryMode match {
-    case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
-    case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
-    case _ => throw new IllegalArgumentException("Unsupported recovery mode: " 
+ recoveryMode)
-  }
-
-  private val scheduler = new MesosClusterScheduler(engineFactory, conf)
-
-  private val server = new MesosRestServer(args.host, args.port, conf, 
scheduler)
-  private val webUi = new MesosClusterUI(
-    new SecurityManager(conf),
-    args.webUiPort,
-    conf,
-    publicAddress,
-    scheduler)
-
-  private val shutdownLatch = new CountDownLatch(1)
-
-  def start(): Unit = {
-    webUi.bind()
-    scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", 
webUi.activeWebUiUrl)
-    scheduler.start()
-    server.start()
-  }
-
-  def awaitShutdown(): Unit = {
-    shutdownLatch.await()
-  }
-
-  def stop(): Unit = {
-    webUi.stop()
-    server.stop()
-    scheduler.stop()
-    shutdownLatch.countDown()
-  }
-}
-
-private[mesos] object MesosClusterDispatcher extends Logging {
-  def main(args: Array[String]) {
-    Utils.initDaemon(log)
-    val conf = new SparkConf
-    val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
-    conf.setMaster(dispatcherArgs.masterUrl)
-    conf.setAppName(dispatcherArgs.name)
-    dispatcherArgs.zookeeperUrl.foreach { z =>
-      conf.set("spark.deploy.recoveryMode", "ZOOKEEPER")
-      conf.set("spark.deploy.zookeeper.url", z)
-    }
-    val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
-    dispatcher.start()
-    logDebug("Adding shutdown hook") // force eager creation of logger
-    ShutdownHookManager.addShutdownHook { () =>
-      logInfo("Shutdown hook is shutting down dispatcher")
-      dispatcher.stop()
-      dispatcher.awaitShutdown()
-    }
-    dispatcher.awaitShutdown()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
deleted file mode 100644
index 11e1344..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos
-
-import scala.annotation.tailrec
-
-import org.apache.spark.SparkConf
-import org.apache.spark.util.{IntParam, Utils}
-
-
-private[mesos] class MesosClusterDispatcherArguments(args: Array[String], 
conf: SparkConf) {
-  var host = Utils.localHostName()
-  var port = 7077
-  var name = "Spark Cluster"
-  var webUiPort = 8081
-  var masterUrl: String = _
-  var zookeeperUrl: Option[String] = None
-  var propertiesFile: String = _
-
-  parse(args.toList)
-
-  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
-
-  @tailrec
-  private def parse(args: List[String]): Unit = args match {
-    case ("--host" | "-h") :: value :: tail =>
-      Utils.checkHost(value, "Please use hostname " + value)
-      host = value
-      parse(tail)
-
-    case ("--port" | "-p") :: IntParam(value) :: tail =>
-      port = value
-      parse(tail)
-
-    case ("--webui-port") :: IntParam(value) :: tail =>
-      webUiPort = value
-      parse(tail)
-
-    case ("--zk" | "-z") :: value :: tail =>
-      zookeeperUrl = Some(value)
-      parse(tail)
-
-    case ("--master" | "-m") :: value :: tail =>
-      if (!value.startsWith("mesos://")) {
-        // scalastyle:off println
-        System.err.println("Cluster dispatcher only supports mesos (uri begins 
with mesos://)")
-        // scalastyle:on println
-        System.exit(1)
-      }
-      masterUrl = value.stripPrefix("mesos://")
-      parse(tail)
-
-    case ("--name") :: value :: tail =>
-      name = value
-      parse(tail)
-
-    case ("--properties-file") :: value :: tail =>
-      propertiesFile = value
-      parse(tail)
-
-    case ("--help") :: tail =>
-      printUsageAndExit(0)
-
-    case Nil =>
-      if (masterUrl == null) {
-        // scalastyle:off println
-        System.err.println("--master is required")
-        // scalastyle:on println
-        printUsageAndExit(1)
-      }
-
-    case _ =>
-      printUsageAndExit(1)
-  }
-
-  private def printUsageAndExit(exitCode: Int): Unit = {
-    // scalastyle:off println
-    System.err.println(
-      "Usage: MesosClusterDispatcher [options]\n" +
-        "\n" +
-        "Options:\n" +
-        "  -h HOST, --host HOST    Hostname to listen on\n" +
-        "  -p PORT, --port PORT    Port to listen on (default: 7077)\n" +
-        "  --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
-        "  --name NAME             Framework name to show in Mesos UI\n" +
-        "  -m --master MASTER      URI for connecting to Mesos master\n" +
-        "  -z --zk ZOOKEEPER       Comma delimited URLs for connecting to \n" +
-        "                          Zookeeper for persistence\n" +
-        "  --properties-file FILE  Path to a custom Spark properties file.\n" +
-        "                          Default is conf/spark-defaults.conf.")
-    // scalastyle:on println
-    System.exit(exitCode)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
deleted file mode 100644
index d4c7022..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos
-
-import java.util.Date
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.Command
-import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
-
-/**
- * Describes a Spark driver that is submitted from the
- * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
- * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
- * @param jarUrl URL to the application jar
- * @param mem Amount of memory for the driver
- * @param cores Number of cores for the driver
- * @param supervise Supervise the driver for long running app
- * @param command The command to launch the driver.
- * @param schedulerProperties Extra properties to pass the Mesos scheduler
- */
-private[spark] class MesosDriverDescription(
-    val name: String,
-    val jarUrl: String,
-    val mem: Int,
-    val cores: Double,
-    val supervise: Boolean,
-    val command: Command,
-    schedulerProperties: Map[String, String],
-    val submissionId: String,
-    val submissionDate: Date,
-    val retryState: Option[MesosClusterRetryState] = None)
-  extends Serializable {
-
-  val conf = new SparkConf(false)
-  schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
-
-  def copy(
-      name: String = name,
-      jarUrl: String = jarUrl,
-      mem: Int = mem,
-      cores: Double = cores,
-      supervise: Boolean = supervise,
-      command: Command = command,
-      schedulerProperties: SparkConf = conf,
-      submissionId: String = submissionId,
-      submissionDate: Date = submissionDate,
-      retryState: Option[MesosClusterRetryState] = retryState): 
MesosDriverDescription = {
-
-    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, 
conf.getAll.toMap,
-      submissionId, submissionDate, retryState)
-  }
-
-  override def toString: String = s"MesosDriverDescription 
(${command.mainClass})"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
deleted file mode 100644
index 6b297c4..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos
-
-import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.ExternalShuffleService
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
-import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, 
ShuffleServiceHeartbeat}
-import org.apache.spark.network.util.TransportConf
-import org.apache.spark.util.ThreadUtils
-
-/**
- * An RPC endpoint that receives registration requests from Spark drivers 
running on Mesos.
- * It detects driver termination and calls the cleanup callback to 
[[ExternalShuffleService]].
- */
-private[mesos] class MesosExternalShuffleBlockHandler(
-    transportConf: TransportConf,
-    cleanerIntervalS: Long)
-  extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
-
-  ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
-    .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, 
TimeUnit.SECONDS)
-
-  // Stores a map of app id to app state (timeout value and last heartbeat)
-  private val connectedApps = new ConcurrentHashMap[String, AppState]()
-
-  protected override def handleMessage(
-      message: BlockTransferMessage,
-      client: TransportClient,
-      callback: RpcResponseCallback): Unit = {
-    message match {
-      case RegisterDriverParam(appId, appState) =>
-        val address = client.getSocketAddress
-        val timeout = appState.heartbeatTimeout
-        logInfo(s"Received registration request from app $appId (remote 
address $address, " +
-          s"heartbeat timeout $timeout ms).")
-        if (connectedApps.containsKey(appId)) {
-          logWarning(s"Received a registration request from app $appId, but it 
was already " +
-            s"registered")
-        }
-        connectedApps.put(appId, appState)
-        callback.onSuccess(ByteBuffer.allocate(0))
-      case Heartbeat(appId) =>
-        val address = client.getSocketAddress
-        Option(connectedApps.get(appId)) match {
-          case Some(existingAppState) =>
-            logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' 
(remote " +
-              s"address $address).")
-            existingAppState.lastHeartbeat = System.nanoTime()
-          case None =>
-            logWarning(s"Received ShuffleServiceHeartbeat from an unknown app 
(remote " +
-              s"address $address, appId '$appId').")
-        }
-      case _ => super.handleMessage(message, client, callback)
-    }
-  }
-
-  /** An extractor object for matching [[RegisterDriver]] message. */
-  private object RegisterDriverParam {
-    def unapply(r: RegisterDriver): Option[(String, AppState)] =
-      Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, 
System.nanoTime())))
-  }
-
-  private object Heartbeat {
-    def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
-  }
-
-  private class AppState(val heartbeatTimeout: Long, @volatile var 
lastHeartbeat: Long)
-
-  private class CleanerThread extends Runnable {
-    override def run(): Unit = {
-      val now = System.nanoTime()
-      connectedApps.asScala.foreach { case (appId, appState) =>
-        if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 
1000) {
-          logInfo(s"Application $appId timed out. Removing shuffle files.")
-          connectedApps.remove(appId)
-          applicationRemoved(appId, true)
-        }
-      }
-    }
-  }
-}
-
-/**
- * A wrapper of [[ExternalShuffleService]] that provides an additional 
endpoint for drivers
- * to associate with. This allows the shuffle service to detect when a driver 
is terminated
- * and can clean up the associated shuffle files.
- */
-private[mesos] class MesosExternalShuffleService(conf: SparkConf, 
securityManager: SecurityManager)
-  extends ExternalShuffleService(conf, securityManager) {
-
-  protected override def newShuffleBlockHandler(
-      conf: TransportConf): ExternalShuffleBlockHandler = {
-    val cleanerIntervalS = 
this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s")
-    new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
-  }
-}
-
-private[spark] object MesosExternalShuffleService extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    ExternalShuffleService.main(args,
-      (conf: SparkConf, sm: SecurityManager) => new 
MesosExternalShuffleService(conf, sm))
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
deleted file mode 100644
index cd98110..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, 
MesosClusterSubmissionState}
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[ui] class DriverPage(parent: MesosClusterUI) extends 
WebUIPage("driver") {
-
-  override def render(request: HttpServletRequest): Seq[Node] = {
-    val driverId = request.getParameter("id")
-    require(driverId != null && driverId.nonEmpty, "Missing id parameter")
-
-    val state = parent.scheduler.getDriverState(driverId)
-    if (state.isEmpty) {
-      val content =
-        <div>
-          <p>Cannot find driver {driverId}</p>
-        </div>
-      return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
-    }
-    val driverState = state.get
-    val driverHeaders = Seq("Driver property", "Value")
-    val schedulerHeaders = Seq("Scheduler property", "Value")
-    val commandEnvHeaders = Seq("Command environment variable", "Value")
-    val launchedHeaders = Seq("Launched property", "Value")
-    val commandHeaders = Seq("Command property", "Value")
-    val retryHeaders = Seq("Last failed status", "Next retry time", "Retry 
count")
-    val driverDescription = Iterable.apply(driverState.description)
-    val submissionState = Iterable.apply(driverState.submissionState)
-    val command = Iterable.apply(driverState.description.command)
-    val schedulerProperties = 
Iterable.apply(driverState.description.conf.getAll.toMap)
-    val commandEnv = 
Iterable.apply(driverState.description.command.environment)
-    val driverTable =
-      UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
-    val commandTable =
-      UIUtils.listingTable(commandHeaders, commandRow, command)
-    val commandEnvTable =
-      UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
-    val schedulerTable =
-      UIUtils.listingTable(schedulerHeaders, propertiesRow, 
schedulerProperties)
-    val launchedTable =
-      UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
-    val retryTable =
-      UIUtils.listingTable(
-        retryHeaders, retryRow, 
Iterable.apply(driverState.description.retryState))
-    val content =
-      <p>Driver state information for driver id {driverId}</p>
-        <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a>
-        <div class="row-fluid">
-          <div class="span12">
-            <h4>Driver state: {driverState.state}</h4>
-            <h4>Driver properties</h4>
-            {driverTable}
-            <h4>Driver command</h4>
-            {commandTable}
-            <h4>Driver command environment</h4>
-            {commandEnvTable}
-            <h4>Scheduler properties</h4>
-            {schedulerTable}
-            <h4>Launched state</h4>
-            {launchedTable}
-            <h4>Retry state</h4>
-            {retryTable}
-          </div>
-        </div>;
-
-    UIUtils.basicSparkPage(content, s"Details for Job $driverId")
-  }
-
-  private def launchedRow(submissionState: 
Option[MesosClusterSubmissionState]): Seq[Node] = {
-    submissionState.map { state =>
-      <tr>
-        <td>Mesos Slave ID</td>
-        <td>{state.slaveId.getValue}</td>
-      </tr>
-      <tr>
-        <td>Mesos Task ID</td>
-        <td>{state.taskId.getValue}</td>
-      </tr>
-      <tr>
-        <td>Launch Time</td>
-        <td>{state.startDate}</td>
-      </tr>
-      <tr>
-        <td>Finish Time</td>
-        <td>{state.finishDate.map(_.toString).getOrElse("")}</td>
-      </tr>
-      <tr>
-        <td>Last Task Status</td>
-        <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
-      </tr>
-    }.getOrElse(Seq[Node]())
-  }
-
-  private def propertiesRow(properties: collection.Map[String, String]): 
Seq[Node] = {
-    properties.map { case (k, v) =>
-      <tr>
-        <td>{k}</td><td>{v}</td>
-      </tr>
-    }.toSeq
-  }
-
-  private def commandRow(command: Command): Seq[Node] = {
-    <tr>
-      <td>Main class</td><td>{command.mainClass}</td>
-    </tr>
-    <tr>
-      <td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
-    </tr>
-    <tr>
-      <td>Class path entries</td><td>{command.classPathEntries.mkString(" 
")}</td>
-    </tr>
-    <tr>
-      <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
-    </tr>
-    <tr>
-      <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" 
"))}</td>
-    </tr>
-  }
-
-  private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
-    <tr>
-      <td>Name</td><td>{driver.name}</td>
-    </tr>
-    <tr>
-      <td>Id</td><td>{driver.submissionId}</td>
-    </tr>
-    <tr>
-      <td>Cores</td><td>{driver.cores}</td>
-    </tr>
-    <tr>
-      <td>Memory</td><td>{driver.mem}</td>
-    </tr>
-    <tr>
-      <td>Submitted</td><td>{driver.submissionDate}</td>
-    </tr>
-    <tr>
-      <td>Supervise</td><td>{driver.supervise}</td>
-    </tr>
-  }
-
-  private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] 
= {
-    retryState.map { state =>
-      <tr>
-        <td>
-          {state.lastFailureStatus}
-        </td>
-        <td>
-          {state.nextRetry}
-        </td>
-        <td>
-          {state.retries}
-        </td>
-      </tr>
-    }.getOrElse(Seq[Node]())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
deleted file mode 100644
index 8dcbdaa..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import org.apache.mesos.Protos.TaskStatus
-
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends 
WebUIPage("") {
-  private val historyServerURL = 
parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    val state = parent.scheduler.getSchedulerState()
-
-    val driverHeader = Seq("Driver ID")
-    val historyHeader = historyServerURL.map(url => 
Seq("History")).getOrElse(Nil)
-    val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
-
-    val queuedHeaders = driverHeader ++ submissionHeader
-    val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
-      Seq("Start Date", "Mesos Slave ID", "State")
-    val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
-      Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
-    val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, 
state.queuedDrivers)
-    val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, 
state.launchedDrivers)
-    val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, 
state.finishedDrivers)
-    val retryTable = UIUtils.listingTable(retryHeaders, retryRow, 
state.pendingRetryDrivers)
-    val content =
-      <p>Mesos Framework ID: {state.frameworkId}</p>
-      <div class="row-fluid">
-        <div class="span12">
-          <h4>Queued Drivers:</h4>
-          {queuedTable}
-          <h4>Launched Drivers:</h4>
-          {launchedTable}
-          <h4>Finished Drivers:</h4>
-          {finishedTable}
-          <h4>Supervise drivers waiting for retry:</h4>
-          {retryTable}
-        </div>
-      </div>;
-    UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
-  }
-
-  private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
-    val id = submission.submissionId
-    <tr>
-      <td><a href={s"driver?id=$id"}>{id}</a></td>
-      <td>{submission.submissionDate}</td>
-      <td>{submission.command.mainClass}</td>
-      <td>cpus: {submission.cores}, mem: {submission.mem}</td>
-    </tr>
-  }
-
-  private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
-    val id = state.driverDescription.submissionId
-
-    val historyCol = if (historyServerURL.isDefined) {
-      <td>
-        <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
-          {state.frameworkId}
-        </a>
-      </td>
-    } else Nil
-
-    <tr>
-      <td><a href={s"driver?id=$id"}>{id}</a></td>
-      {historyCol}
-      <td>{state.driverDescription.submissionDate}</td>
-      <td>{state.driverDescription.command.mainClass}</td>
-      <td>cpus: {state.driverDescription.cores}, mem: 
{state.driverDescription.mem}</td>
-      <td>{state.startDate}</td>
-      <td>{state.slaveId.getValue}</td>
-      <td>{stateString(state.mesosTaskStatus)}</td>
-    </tr>
-  }
-
-  private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
-    val id = submission.submissionId
-    <tr>
-      <td><a href={s"driver?id=$id"}>{id}</a></td>
-      <td>{submission.submissionDate}</td>
-      <td>{submission.command.mainClass}</td>
-      <td>{submission.retryState.get.lastFailureStatus}</td>
-      <td>{submission.retryState.get.nextRetry}</td>
-      <td>{submission.retryState.get.retries}</td>
-    </tr>
-  }
-
-  private def stateString(status: Option[TaskStatus]): String = {
-    if (status.isEmpty) {
-      return ""
-    }
-    val sb = new StringBuilder
-    val s = status.get
-    sb.append(s"State: ${s.getState}")
-    if (status.get.hasMessage) {
-      sb.append(s", Message: ${s.getMessage}")
-    }
-    if (status.get.hasHealthy) {
-      sb.append(s", Healthy: ${s.getHealthy}")
-    }
-    if (status.get.hasSource) {
-      sb.append(s", Source: ${s.getSource}")
-    }
-    if (status.get.hasReason) {
-      sb.append(s", Reason: ${s.getReason}")
-    }
-    if (status.get.hasTimestamp) {
-      sb.append(s", Time: ${s.getTimestamp}")
-    }
-    sb.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
deleted file mode 100644
index 6049789..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.mesos.ui
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
-import org.apache.spark.ui.{SparkUI, WebUI}
-import org.apache.spark.ui.JettyUtils._
-
-/**
- * UI that displays driver results from the 
[[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
- */
-private[spark] class MesosClusterUI(
-    securityManager: SecurityManager,
-    port: Int,
-    val conf: SparkConf,
-    dispatcherPublicAddress: String,
-    val scheduler: MesosClusterScheduler)
-  extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, 
conf) {
-
-  initialize()
-
-  def activeWebUiUrl: String = "http://"; + dispatcherPublicAddress + ":" + 
boundPort
-
-  override def initialize() {
-    attachPage(new MesosClusterPage(this))
-    attachPage(new DriverPage(this))
-    attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, 
"/static"))
-  }
-}
-
-private object MesosClusterUI {
-  val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
deleted file mode 100644
index 3b96488..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.rest.mesos
-
-import java.io.File
-import java.text.SimpleDateFormat
-import java.util.Date
-import java.util.concurrent.atomic.AtomicLong
-import javax.servlet.http.HttpServletResponse
-
-import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.deploy.rest._
-import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
-import org.apache.spark.util.Utils
-
-/**
- * A server that responds to requests submitted by the 
[[RestSubmissionClient]].
- * All requests are forwarded to
- * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
- * This is intended to be used in Mesos cluster mode only.
- * For more details about the REST submission please refer to 
[[RestSubmissionServer]] javadocs.
- */
-private[spark] class MesosRestServer(
-    host: String,
-    requestedPort: Int,
-    masterConf: SparkConf,
-    scheduler: MesosClusterScheduler)
-  extends RestSubmissionServer(host, requestedPort, masterConf) {
-
-  protected override val submitRequestServlet =
-    new MesosSubmitRequestServlet(scheduler, masterConf)
-  protected override val killRequestServlet =
-    new MesosKillRequestServlet(scheduler, masterConf)
-  protected override val statusRequestServlet =
-    new MesosStatusRequestServlet(scheduler, masterConf)
-}
-
-private[mesos] class MesosSubmitRequestServlet(
-    scheduler: MesosClusterScheduler,
-    conf: SparkConf)
-  extends SubmitRequestServlet {
-
-  private val DEFAULT_SUPERVISE = false
-  private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb
-  private val DEFAULT_CORES = 1.0
-
-  private val nextDriverNumber = new AtomicLong(0)
-  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // 
For application IDs
-  private def newDriverId(submitDate: Date): String = {
-    "driver-%s-%04d".format(
-      createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet())
-  }
-
-  /**
-   * Build a driver description from the fields specified in the submit 
request.
-   *
-   * This involves constructing a command that launches a mesos framework for 
the job.
-   * This does not currently consider fields used by python applications since 
python
-   * is not supported in mesos cluster mode yet.
-   */
-  private def buildDriverDescription(request: CreateSubmissionRequest): 
MesosDriverDescription = {
-    // Required fields, including the main class because python is not yet 
supported
-    val appResource = Option(request.appResource).getOrElse {
-      throw new SubmitRestMissingFieldException("Application jar is missing.")
-    }
-    val mainClass = Option(request.mainClass).getOrElse {
-      throw new SubmitRestMissingFieldException("Main class is missing.")
-    }
-
-    // Optional fields
-    val sparkProperties = request.sparkProperties
-    val driverExtraJavaOptions = 
sparkProperties.get("spark.driver.extraJavaOptions")
-    val driverExtraClassPath = 
sparkProperties.get("spark.driver.extraClassPath")
-    val driverExtraLibraryPath = 
sparkProperties.get("spark.driver.extraLibraryPath")
-    val superviseDriver = sparkProperties.get("spark.driver.supervise")
-    val driverMemory = sparkProperties.get("spark.driver.memory")
-    val driverCores = sparkProperties.get("spark.driver.cores")
-    val appArgs = request.appArgs
-    val environmentVariables = request.environmentVariables
-    val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
-
-    // Construct driver description
-    val conf = new SparkConf(false).setAll(sparkProperties)
-    val extraClassPath = 
driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
-    val extraLibraryPath = 
driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
-    val extraJavaOpts = 
driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
-    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
-    val javaOpts = sparkJavaOpts ++ extraJavaOpts
-    val command = new Command(
-      mainClass, appArgs, environmentVariables, extraClassPath, 
extraLibraryPath, javaOpts)
-    val actualSuperviseDriver = 
superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
-    val actualDriverMemory = 
driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
-    val actualDriverCores = 
driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
-    val submitDate = new Date()
-    val submissionId = newDriverId(submitDate)
-
-    new MesosDriverDescription(
-      name, appResource, actualDriverMemory, actualDriverCores, 
actualSuperviseDriver,
-      command, request.sparkProperties, submissionId, submitDate)
-  }
-
-  protected override def handleSubmit(
-      requestMessageJson: String,
-      requestMessage: SubmitRestProtocolMessage,
-      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
-    requestMessage match {
-      case submitRequest: CreateSubmissionRequest =>
-        val driverDescription = buildDriverDescription(submitRequest)
-        val s = scheduler.submitDriver(driverDescription)
-        s.serverSparkVersion = sparkVersion
-        val unknownFields = findUnknownFields(requestMessageJson, 
requestMessage)
-        if (unknownFields.nonEmpty) {
-          // If there are fields that the server does not know about, warn the 
client
-          s.unknownFields = unknownFields
-        }
-        s
-      case unexpected =>
-        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
-        handleError(s"Received message of unexpected type 
${unexpected.messageType}.")
-    }
-  }
-}
-
-private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, 
conf: SparkConf)
-  extends KillRequestServlet {
-  protected override def handleKill(submissionId: String): 
KillSubmissionResponse = {
-    val k = scheduler.killDriver(submissionId)
-    k.serverSparkVersion = sparkVersion
-    k
-  }
-}
-
-private[mesos] class MesosStatusRequestServlet(scheduler: 
MesosClusterScheduler, conf: SparkConf)
-  extends StatusRequestServlet {
-  protected override def handleStatus(submissionId: String): 
SubmissionStatusResponse = {
-    val d = scheduler.getDriverStatus(submissionId)
-    d.serverSparkVersion = sparkVersion
-    d
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
deleted file mode 100644
index 680cfb7..0000000
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, 
MesosExecutorDriver}
-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.{SparkConf, SparkEnv, TaskState}
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
-import org.apache.spark.util.Utils
-
-private[spark] class MesosExecutorBackend
-  extends MesosExecutor
-  with ExecutorBackend
-  with Logging {
-
-  var executor: Executor = null
-  var driver: ExecutorDriver = null
-
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
-    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
-    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
-      .setTaskId(mesosTaskId)
-      .setState(TaskState.toMesos(state))
-      .setData(ByteString.copyFrom(data))
-      .build())
-  }
-
-  override def registered(
-      driver: ExecutorDriver,
-      executorInfo: ExecutorInfo,
-      frameworkInfo: FrameworkInfo,
-      slaveInfo: SlaveInfo) {
-
-    // Get num cores for this task from ExecutorInfo, created in 
MesosSchedulerBackend.
-    val cpusPerTask = executorInfo.getResourcesList.asScala
-      .find(_.getName == "cpus")
-      .map(_.getScalar.getValue.toInt)
-      .getOrElse(0)
-    val executorId = executorInfo.getExecutorId.getValue
-
-    logInfo(s"Registered with Mesos as executor ID $executorId with 
$cpusPerTask cpus")
-    this.driver = driver
-    // Set a context class loader to be picked up by the serializer. Without 
this call
-    // the serializer would default to the null class loader, and fail to find 
Spark classes
-    // See SPARK-10986.
-    Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader)
-
-    val properties = Utils.deserialize[Array[(String, 
String)]](executorInfo.getData.toByteArray) ++
-      Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
-    val conf = new SparkConf(loadDefaults = true).setAll(properties)
-    val port = conf.getInt("spark.executor.port", 0)
-    val env = SparkEnv.createExecutorEnv(
-      conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = 
false)
-
-    executor = new Executor(
-      executorId,
-      slaveInfo.getHostname,
-      env)
-  }
-
-  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
-    val taskId = taskInfo.getTaskId.getValue.toLong
-    val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
-    if (executor == null) {
-      logError("Received launchTask but executor was null")
-    } else {
-      SparkHadoopUtil.get.runAsSparkUser { () =>
-        executor.launchTask(this, taskId = taskId, attemptNumber = 
taskData.attemptNumber,
-          taskInfo.getName, taskData.serializedTask)
-      }
-    }
-  }
-
-  override def error(d: ExecutorDriver, message: String) {
-    logError("Error from Mesos: " + message)
-  }
-
-  override def killTask(d: ExecutorDriver, t: TaskID) {
-    if (executor == null) {
-      logError("Received KillTask but executor was null")
-    } else {
-      // TODO: Determine the 'interruptOnCancel' property set for the given 
job.
-      executor.killTask(t.getValue.toLong, interruptThread = false)
-    }
-  }
-
-  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
-
-  override def disconnected(d: ExecutorDriver) {}
-
-  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
-
-  override def shutdown(d: ExecutorDriver) {}
-}
-
-/**
- * Entry point for Mesos executor.
- */
-private[spark] object MesosExecutorBackend extends Logging {
-  def main(args: Array[String]) {
-    Utils.initDaemon(log)
-    // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend()
-    new MesosExecutorDriver(runner).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
deleted file mode 100644
index 61ab3e8..0000000
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-
-import org.apache.curator.framework.CuratorFramework
-import org.apache.zookeeper.CreateMode
-import org.apache.zookeeper.KeeperException.NoNodeException
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkCuratorUtil
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Persistence engine factory that is responsible for creating new persistence 
engines
- * to store Mesos cluster mode state.
- */
-private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: 
SparkConf) {
-  def createEngine(path: String): MesosClusterPersistenceEngine
-}
-
-/**
- * Mesos cluster persistence engine is responsible for persisting Mesos 
cluster mode
- * specific state, so that on failover all the state can be recovered and the 
scheduler
- * can resume managing the drivers.
- */
-private[spark] trait MesosClusterPersistenceEngine {
-  def persist(name: String, obj: Object): Unit
-  def expunge(name: String): Unit
-  def fetch[T](name: String): Option[T]
-  def fetchAll[T](): Iterable[T]
-}
-
-/**
- * Zookeeper backed persistence engine factory.
- * All Zk engines created from this factory shares the same Zookeeper client, 
so
- * all of them reuses the same connection pool.
- */
-private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: 
SparkConf)
-  extends MesosClusterPersistenceEngineFactory(conf) with Logging {
-
-  lazy val zk = SparkCuratorUtil.newClient(conf)
-
-  def createEngine(path: String): MesosClusterPersistenceEngine = {
-    new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
-  }
-}
-
-/**
- * Black hole persistence engine factory that creates black hole
- * persistence engines, which stores nothing.
- */
-private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
-  extends MesosClusterPersistenceEngineFactory(null) {
-  def createEngine(path: String): MesosClusterPersistenceEngine = {
-    new BlackHoleMesosClusterPersistenceEngine
-  }
-}
-
-/**
- * Black hole persistence engine that stores nothing.
- */
-private[spark] class BlackHoleMesosClusterPersistenceEngine extends 
MesosClusterPersistenceEngine {
-  override def persist(name: String, obj: Object): Unit = {}
-  override def fetch[T](name: String): Option[T] = None
-  override def expunge(name: String): Unit = {}
-  override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
-}
-
-/**
- * Zookeeper based Mesos cluster persistence engine, that stores cluster mode 
state
- * into Zookeeper. Each engine object is operating under one folder in 
Zookeeper, but
- * reuses a shared Zookeeper client.
- */
-private[spark] class ZookeeperMesosClusterPersistenceEngine(
-    baseDir: String,
-    zk: CuratorFramework,
-    conf: SparkConf)
-  extends MesosClusterPersistenceEngine with Logging {
-  private val WORKING_DIR =
-    conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + 
baseDir
-
-  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
-
-  def path(name: String): String = {
-    WORKING_DIR + "/" + name
-  }
-
-  override def expunge(name: String): Unit = {
-    zk.delete().forPath(path(name))
-  }
-
-  override def persist(name: String, obj: Object): Unit = {
-    val serialized = Utils.serialize(obj)
-    val zkPath = path(name)
-    zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
-  }
-
-  override def fetch[T](name: String): Option[T] = {
-    val zkPath = path(name)
-
-    try {
-      val fileData = zk.getData().forPath(zkPath)
-      Some(Utils.deserialize[T](fileData))
-    } catch {
-      case e: NoNodeException => None
-      case e: Exception =>
-        logWarning("Exception while reading persisted file, deleting", e)
-        zk.delete().forPath(zkPath)
-        None
-    }
-  }
-
-  override def fetchAll[T](): Iterable[T] = {
-    zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to