Repository: spark
Updated Branches:
  refs/heads/master e4e8d8f39 -> 698373211


http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
deleted file mode 100644
index bfa8f84..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-
-trait WorkerRunnableUtil extends Logging {
-
-  val yarnConf: YarnConfiguration
-  val sparkConf: SparkConf
-  lazy val env = prepareEnvironment
-
-  def prepareCommand(
-      masterAddress: String,
-      slaveId: String,
-      hostname: String,
-      workerMemory: Int,
-      workerCores: Int) = {
-    // Extra options for the JVM
-    var JAVA_OPTS = ""
-    // Set the JVM memory
-    val workerMemoryString = workerMemory + "m"
-    JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + 
" "
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
-    }
-
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
-    // Commenting it out for now - so that people can refer to the properties 
if required. Remove
-    // it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all 
cores to do gc - hence
-    // if there are multiple containers in same node, spark gc effects all 
other containers
-    // performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 
'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it 
is limited to subset
-    // of cores on a node.
-    /*
-        else {
-          // If no java_opts specified, default to using 
-XX:+CMSIncrementalMode
-          // It might be possible that other modes/config is being done in 
SPARK_JAVA_OPTS, so we dont
-          // want to mess with it.
-          // In our expts, using (default) throughput collector has severe 
perf ramnifications in
-          // multi-tennent machines
-          // The options are based on
-          // 
http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-          JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-          JAVA_OPTS += " -XX:+CMSIncrementalMode "
-          JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
-        }
-    */
-
-    var javaCommand = "java"
-    val javaHome = System.getenv("JAVA_HOME")
-    if ((javaHome != null && !javaHome.isEmpty()) || 
env.isDefinedAt("JAVA_HOME")) {
-      javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
-    }
-
-    val commands = List[String](javaCommand +
-      " -server " +
-      // Kill if OOM is raised - leverage yarn's failure handling to cause 
rescheduling.
-      // Not killing the task leaves various aspects of the worker and (to 
some extent) the jvm in
-      // an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different 
node, then do
-      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      workerCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    commands
-  }
-
-  private def setupDistributedCache(
-      file: String,
-      rtype: LocalResourceType,
-      localResources: HashMap[String, LocalResource],
-      timestamp: String,
-      size: String, 
-      vis: String) = {
-    val uri = new URI(file)
-    val amJarRsrc = 
Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
-    amJarRsrc.setTimestamp(timestamp.toLong)
-    amJarRsrc.setSize(size.toLong)
-    localResources(uri.getFragment()) = amJarRsrc
-  }
-
-  def prepareLocalResources: HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    val localResources = HashMap[String, LocalResource]()
-
-    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
-      val timeStamps = 
System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-      val fileSizes = 
System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
-      val visibilities = 
System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
-      for( i <- 0 to distFiles.length - 1) {
-        setupDistributedCache(distFiles(i), LocalResourceType.FILE, 
localResources, timeStamps(i),
-          fileSizes(i), visibilities(i))
-      }
-    }
-
-    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
-      val timeStamps = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
-      val fileSizes = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
-      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
-      val visibilities = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
-      for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, 
localResources, 
-          timeStamps(i), fileSizes(i), visibilities(i))
-      }
-    }
-
-    logInfo("Prepared Local resources " + localResources)
-    localResources
-  }
-
-  def prepareEnvironment: HashMap[String, String] = {
-    val env = new HashMap[String, String]()
-
-    ClientBase.populateClasspath(yarnConf, sparkConf, 
System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
-
-    // Allow users to specify some environment variables
-    Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => 
env(k) = v }
-    env
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 522e0a9..6b91e6b 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
 
 /**
  *
- * This scheduler launch worker through Yarn - by call into Client to launch 
WorkerLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to 
launch ExecutorLauncher as AM.
  */
 private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: 
Configuration) extends TaskSchedulerImpl(sc) {
 
@@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: 
SparkContext, conf: Configur
 
   override def postStartHook() {
 
-    // The yarn application is running, but the worker might not yet ready
+    // The yarn application is running, but the executor might not yet ready
     // Wait for a few seconds for the slaves to bootstrap and register with 
master - best case attempt
     Thread.sleep(2000L)
     logInfo("YarnClientClusterScheduler.postStartHook done")

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e7130d2..d1f13e3 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
       "--class", "notused",
       "--jar", null,
       "--args", hostport,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+      "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
     )
 
     // process any optional arguments, use the defaults already defined in 
ClientArguments 
     // if things aren't specified
-    Map("--master-memory" -> "SPARK_MASTER_MEMORY",
-      "--num-workers" -> "SPARK_WORKER_INSTANCES",
-      "--worker-memory" -> "SPARK_WORKER_MEMORY",
-      "--worker-cores" -> "SPARK_WORKER_CORES",
-      "--queue" -> "SPARK_YARN_QUEUE",
-      "--name" -> "SPARK_YARN_APP_NAME",
-      "--files" -> "SPARK_YARN_DIST_FILES",
-      "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
-    .foreach { case (optName, optParam) => addArg(optName, optParam, 
argsArrayBuf) }
+    Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
+      "SPARK_DRIVER_MEMORY" -> "--driver-memory",
+      "SPARK_WORKER_INSTANCES" -> "--num-executors",
+      "SPARK_WORKER_MEMORY" -> "--executor-memory",
+      "SPARK_WORKER_CORES" -> "--executor-cores",
+      "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
+      "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
+      "SPARK_EXECUTOR_CORES" -> "--executor-cores",
+      "SPARK_YARN_QUEUE" -> "--queue",
+      "SPARK_YARN_APP_NAME" -> "--name",
+      "SPARK_YARN_DIST_FILES" -> "--files",
+      "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
+    .foreach { case (optParam, optName) => addArg(optName, optParam, 
argsArrayBuf) }
       
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
@@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   def waitForApp() {
 
-    // TODO : need a better way to find out whether the workers are ready or 
not
+    // TODO : need a better way to find out whether the executors are ready or 
not
     // maybe by resource usage report?
     while(true) {
       val report = client.getApplicationReport(appId)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 57d1577..30735cb 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
   private var isLastAMRetry: Boolean = true
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  // Default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = 
sparkConf.getInt("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3))
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = 
sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", 
math.max(args.numExecutors * 2, 3)))
 
   private var registered = false
   
@@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
     // Call this to force generation of secret so it gets populated into the
     // hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the worker 
containers.
+    // doAs in order for the credentials to be passed on to the executor 
containers.
     val securityMgr = new SecurityManager(sparkConf)
 
     // Start the user's JAR
@@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     }
 
     // Allocate all containers
-    allocateWorkers()
+    allocateExecutors()
 
     // Wait for the user class to Finish
     userThread.join()
@@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     t
   }
 
-  // This need to happen before allocateWorkers()
+  // This need to happen before allocateExecutors()
   private def waitForSparkContextInitialized() {
     logInfo("Waiting for Spark context initialization")
     try {
@@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     }
   }
 
-  private def allocateWorkers() {
+  private def allocateExecutors() {
     try {
-      logInfo("Allocating " + args.numWorkers + " workers.")
+      logInfo("Allocating " + args.numExecutors + " executors.")
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
-      yarnAllocator.addResourceRequests(args.numWorkers)
+      yarnAllocator.addResourceRequests(args.numExecutors)
       // Exits the loop if the user thread exits.
-      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && 
userThread.isAlive) {
-        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && 
userThread.isAlive) {
+        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
           finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of worker failures reached")
+            "max number of executor failures reached")
         }
         yarnAllocator.allocateResources()
         ApplicationMaster.incrementAllocatorLoop(1)
@@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       // so that the loop in ApplicationMaster#sparkContextInitialized() 
breaks.
       
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
-    logInfo("All workers have launched.")
+    logInfo("All executors have launched.")
 
     // Launch a progress reporter thread, else the app will get killed after 
expiration
     // (def: 10mins) timeout.
@@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
-          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
             finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of worker failures reached")
+              "max number of executor failures reached")
           }
-          val missingWorkerCount = args.numWorkers - 
yarnAllocator.getNumWorkersRunning -
+          val missingExecutorCount = args.numExecutors - 
yarnAllocator.getNumExecutorsRunning -
             yarnAllocator.getNumPendingAllocate
-          if (missingWorkerCount > 0) {
+          if (missingExecutorCount > 0) {
             logInfo("Allocating %d containers to make up for (potentially) 
lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.addResourceRequests(missingWorkerCount)
+              format(missingExecutorCount))
+            yarnAllocator.addResourceRequests(missingExecutorCount)
           }
           sendProgress()
           Thread.sleep(sleepTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
new file mode 100644
index 0000000..b697f10
--- /dev/null
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.Socket
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import akka.actor._
+import akka.remote._
+import akka.actor.Terminated
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, 
sparkConf: SparkConf)
+  extends Logging {
+
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+  private var appAttemptId: ApplicationAttemptId = _
+  private var reporterThread: Thread = _
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var driverClosed:Boolean = false
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+
+  val securityManager = new SecurityManager(sparkConf)
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
+    conf = sparkConf, securityManager = securityManager)._1
+  var actor: ActorRef = _
+
+  // This actor just working as a monitor to watch on Driver Actor.
+  class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorSelection = _
+
+    override def preStart() {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message thus the connection is actually established, 
thus we can monitor Lifecycle Events.
+      driver ! "Hello"
+      context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        driverClosed = true
+    }
+  }
+
+  def run() {
+
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(yarnConf)
+    amClient.start()
+
+    appAttemptId = getApplicationAttemptId()
+    registerApplicationMaster()
+
+    waitForSparkMaster()
+
+    // Allocate all containers
+    allocateExecutors()
+
+    // Launch a progress reporter thread, else app will get killed after 
expiration (def: 10mins) timeout
+    // ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+    val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    // we want to be reasonably responsive without causing too many requests 
to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+    reporterThread = launchReporterThread(interval)
+    
+
+    // Wait for the reporter thread to Finish.
+    reporterThread.join()
+
+    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+    actorSystem.shutdown()
+
+    logInfo("Exited")
+    System.exit(0)
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .orElse(Option(System.getenv("LOCAL_DIRS")))
+ 
+    localDirs match {
+      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case Some(l) => l
+    }
+  } 
+
+  private def getApplicationAttemptId(): ApplicationAttemptId = {
+    val envs = System.getenv()
+    val containerIdString = 
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
+    appAttemptId
+  }
+
+  private def registerApplicationMaster(): RegisterApplicationMasterResponse = 
{
+    logInfo("Registering the ApplicationMaster")
+    // TODO:(Raymond) Find out Spark UI address and fill in here?
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+  }
+
+  private def waitForSparkMaster() {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while(!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+        Thread.sleep(100)
+      }
+    }
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
+
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+      driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = 
"YarnAM")
+  }
+
+
+  private def allocateExecutors() {
+
+    // Fixme: should get preferredNodeLocationData from SparkContext, just 
fake a empty one for now.
+    val preferredNodeLocationData: scala.collection.Map[String, 
scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
+
+    yarnAllocator = YarnAllocationHandler.newAllocator(
+      yarnConf,
+      amClient,
+      appAttemptId,
+      args,
+      preferredNodeLocationData,
+      sparkConf)
+
+    logInfo("Allocating " + args.numExecutors + " executors.")
+    // Wait until all containers have finished
+    // TODO: This is a bit ugly. Can we make it nicer?
+    // TODO: Handle container failure
+
+    yarnAllocator.addResourceRequests(args.numExecutors)
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && 
(!driverClosed)) {
+      yarnAllocator.allocateResources()
+      Thread.sleep(100)
+    }
+
+    logInfo("All executors have launched.")
+
+  }
+
+  // TODO: We might want to extend this to allocate more containers in case 
they die !
+  private def launchReporterThread(_sleepTime: Long): Thread = {
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+
+    val t = new Thread {
+      override def run() {
+        while (!driverClosed) {
+          val missingExecutorCount = args.numExecutors - 
yarnAllocator.getNumExecutorsRunning -
+            yarnAllocator.getNumPendingAllocate
+          if (missingExecutorCount > 0) {
+            logInfo("Allocating %d containers to make up for (potentially) 
lost containers".
+              format(missingExecutorCount))
+            yarnAllocator.addResourceRequests(missingExecutorCount)
+          }
+          sendProgress()
+          Thread.sleep(sleepTime)
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    t
+  }
+
+  private def sendProgress() {
+    logDebug("Sending progress")
+    // simulated with an allocate request with no nodes requested ...
+    yarnAllocator.allocateResources()
+  }
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+    logInfo("finish ApplicationMaster with " + status)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* 
appTrackingUrl */)
+  }
+
+}
+
+
+object ExecutorLauncher {
+  def main(argStrings: Array[String]) {
+    val args = new ApplicationMasterArguments(argStrings)
+    new ExecutorLauncher(args).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
new file mode 100644
index 0000000..53c403f
--- /dev/null
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+
+import org.apache.spark.{SparkConf, Logging}
+
+
+class ExecutorRunnable(
+    container: Container,
+    conf: Configuration,
+    spConf: SparkConf,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    executorMemory: Int,
+    executorCores: Int)
+  extends Runnable with ExecutorRunnableUtil with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var nmClient: NMClient = _
+  val sparkConf = spConf
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Executor Container")
+    nmClient = NMClient.createNMClient()
+    nmClient.init(yarnConf)
+    nmClient.start()
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    ctx.setEnvironment(env)
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
+
+    val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores)
+
+    logInfo("Setting up executor with commands: " + commands)
+    ctx.setCommands(commands)
+
+    // Send the start request to the ContainerManager
+    nmClient.startContainer(container, ctx)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
deleted file mode 100644
index f1c1fea..0000000
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, 
sparkConf: SparkConf)
-  extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
-
-  private var amClient: AMRMClient[ContainerRequest] = _
-
-  val securityManager = new SecurityManager(sparkConf)
-  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
-    conf = sparkConf, securityManager = securityManager)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, 
thus we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-    }
-  }
-
-  def run() {
-
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(yarnConf)
-    amClient.start()
-
-    appAttemptId = getApplicationAttemptId()
-    registerApplicationMaster()
-
-    waitForSparkMaster()
-
-    // Allocate all containers
-    allocateWorkers()
-
-    // Launch a progress reporter thread, else app will get killed after 
expiration (def: 10mins) timeout
-    // ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // we want to be reasonably responsive without causing too many requests 
to RM.
-    val schedulerInterval =
-      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
-    // must be <= timeoutInterval / 2.
-    val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-    reporterThread = launchReporterThread(interval)
-    
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
- 
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  } 
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = 
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = 
{
-    logInfo("Registering the ApplicationMaster")
-    // TODO:(Raymond) Find out Spark UI address and fill in here?
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = 
"YarnAM")
-  }
-
-
-  private def allocateWorkers() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just 
fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, 
scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(
-      yarnConf,
-      amClient,
-      appAttemptId,
-      args,
-      preferredNodeLocationData,
-      sparkConf)
-
-    logInfo("Allocating " + args.numWorkers + " workers.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-
-    yarnAllocator.addResourceRequests(args.numWorkers)
-    while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && 
(!driverClosed)) {
-      yarnAllocator.allocateResources()
-      Thread.sleep(100)
-    }
-
-    logInfo("All workers have launched.")
-
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case 
they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed) {
-          val missingWorkerCount = args.numWorkers - 
yarnAllocator.getNumWorkersRunning -
-            yarnAllocator.getNumPendingAllocate
-          if (missingWorkerCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) 
lost containers".
-              format(missingWorkerCount))
-            yarnAllocator.addResourceRequests(missingWorkerCount)
-          }
-          sendProgress()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateResources()
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-    logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* 
appTrackingUrl */)
-  }
-
-}
-
-
-object WorkerLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    new WorkerLauncher(args).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index ab4a79b..0000000
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.client.api.NMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
-
-
-class WorkerRunnable(
-    container: Container,
-    conf: Configuration,
-    spConf: SparkConf,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    workerMemory: Int,
-    workerCores: Int) 
-  extends Runnable with WorkerRunnableUtil with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var nmClient: NMClient = _
-  val sparkConf = spConf
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Worker Container")
-    nmClient = NMClient.createNMClient()
-    nmClient.init(yarnConf)
-    nmClient.start()
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    ctx.setEnvironment(env)
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
-
-    val commands = prepareCommand(masterAddress, slaveId, hostname, 
workerMemory, workerCores)
-
-    logInfo("Setting up worker with commands: " + commands)
-    ctx.setCommands(commands)
-
-    // Send the start request to the ContainerManager
-    nmClient.startContainer(container, ctx)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 1ac6112..e31c406 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val amClient: AMRMClient[ContainerRequest],
     val appAttemptId: ApplicationAttemptId,
-    val maxWorkers: Int,
-    val workerMemory: Int,
-    val workerCores: Int,
+    val maxExecutors: Int,
+    val executorMemory: Int,
+    val executorCores: Int,
     val preferredHostToCount: Map[String, Int], 
     val preferredRackToCount: Map[String, Int],
     val sparkConf: SparkConf)
@@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
   // Number of container requests that have been sent to, but not yet 
allocated by the
   // ApplicationMaster.
   private val numPendingAllocate = new AtomicInteger()
-  private val numWorkersRunning = new AtomicInteger()
-  // Used to generate a unique id per worker
-  private val workerIdCounter = new AtomicInteger()
+  private val numExecutorsRunning = new AtomicInteger()
+  // Used to generate a unique id per executor
+  private val executorIdCounter = new AtomicInteger()
   private val lastResponseId = new AtomicInteger()
-  private val numWorkersFailed = new AtomicInteger()
+  private val numExecutorsFailed = new AtomicInteger()
 
   def getNumPendingAllocate: Int = numPendingAllocate.intValue
 
-  def getNumWorkersRunning: Int = numWorkersRunning.intValue
+  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
-  def getNumWorkersFailed: Int = numWorkersFailed.intValue
+  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
   }
 
   def releaseContainer(container: Container) {
@@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
 
       logDebug("""
         Allocated containers: %d
-        Current worker count: %d
+        Current executor count: %d
         Containers released: %s
         Containers to-be-released: %s
         Cluster resources: %s
         """.format(
           allocatedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers,
           allocateResponse.getAvailableResources))
@@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
 
       // Run each of the allocated containers.
       for (container <- allocatedContainersToProcess) {
-        val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
-        val workerHostname = container.getNodeId.getHost
+        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+        val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        val workerMemoryOverhead = (workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
-        assert(container.getResource.getMemory >= workerMemoryOverhead)
+        val executorMemoryOverhead = (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+        assert(container.getResource.getMemory >= executorMemoryOverhead)
 
-        if (numWorkersRunningNow > maxWorkers) {
+        if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have 
the required number of
-            containers for it.""".format(containerId, workerHostname))
+            containers for it.""".format(containerId, executorHostname))
           releaseContainer(container)
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
         } else {
-          val workerId = workerIdCounter.incrementAndGet().toString
+          val executorId = executorIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
             sparkConf.get("spark.driver.host"),
             sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
-          logInfo("Launching container %s for on host %s".format(containerId, 
workerHostname))
+          logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
 
           // To be safe, remove the container from `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
 
-          val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
           allocatedHostToContainersMap.synchronized {
-            val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+            val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
               new HashSet[ContainerId]())
 
             containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, workerHostname)
+            allocatedContainerToHostMap.put(containerId, executorHostname)
 
             if (rack != null) {
               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 
0) + 1)
             }
           }
-          logInfo("Launching WorkerRunnable. driverUrl: %s,  workerHostname: 
%s".format(driverUrl, workerHostname))
-          val workerRunnable = new WorkerRunnable(
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(driverUrl, executorHostname))
+          val executorRunnable = new ExecutorRunnable(
             container,
             conf,
             sparkConf,
             driverUrl,
-            workerId,
-            workerHostname,
-            workerMemory,
-            workerCores)
-          new Thread(workerRunnable).start()
+            executorId,
+            executorHostname,
+            executorMemory,
+            executorCores)
+          new Thread(executorRunnable).start()
         }
       }
       logDebug("""
         Finished allocating %s containers (from %s originally).
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           allocatedContainersToProcess,
           allocatedContainers,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
           // `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
         } else {
-          // Decrement the number of workers running. The next iteration of 
the ApplicationMaster's
+          // Decrement the number of executors running. The next iteration of 
the ApplicationMaster's
           // reporting thread will take care of allocating.
-          numWorkersRunning.decrementAndGet()
+          numExecutorsRunning.decrementAndGet()
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
             containerId,
             completedContainer.getState,
@@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
           // now I think its ok as none of the containers are expected to exit
           if (completedContainer.getExitStatus() != 0) {
             logInfo("Container marked as failed: " + containerId)
-            numWorkersFailed.incrementAndGet()
+            numExecutorsFailed.incrementAndGet()
           }
         }
 
@@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
       }
       logDebug("""
         Finished processing %d completed containers.
-        Current number of workers running: %d,
+        Current number of executors running: %d,
         releasedContainerList: %s,
         pendingReleaseContainers: %s
         """.format(
           completedContainers.size,
-          numWorkersRunning.get(),
+          numExecutorsRunning.get(),
           releasedContainerList,
           pendingReleaseContainers))
     }
@@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
     retval
   }
 
-  def addResourceRequests(numWorkers: Int) {
+  def addResourceRequests(numExecutors: Int) {
     val containerRequests: List[ContainerRequest] =
-      if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
-        logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+      if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
+        logDebug("numExecutors: " + numExecutors + ", host preferences: " +
           preferredHostToCount.isEmpty)
         createResourceRequests(
           AllocationType.ANY,
           resource = null,
-          numWorkers,
+          numExecutors,
           YarnAllocationHandler.PRIORITY).toList
       } else {
-        // Request for all hosts in preferred nodes and for numWorkers - 
+        // Request for all hosts in preferred nodes and for numExecutors - 
         // candidates.size, request by default allocation policy.
         val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
         for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
         val anyContainerRequests = createResourceRequests(
           AllocationType.ANY,
           resource = null,
-          numWorkers,
+          numExecutors,
           YarnAllocationHandler.PRIORITY)
 
         val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
       amClient.addContainerRequest(request)
     }
 
-    if (numWorkers > 0) {
-      numPendingAllocate.addAndGet(numWorkers)
-      logInfo("Will Allocate %d worker containers, each with %d memory".format(
-        numWorkers,
-        (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+    if (numExecutors > 0) {
+      numPendingAllocate.addAndGet(numExecutors)
+      logInfo("Will Allocate %d executor containers, each with %d 
memory".format(
+        numExecutors,
+        (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
     } else {
       logDebug("Empty allocation request ...")
     }
@@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
   private def createResourceRequests(
       requestType: AllocationType.AllocationType,
       resource: String,
-      numWorkers: Int,
+      numExecutors: Int,
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
@@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
         val nodeLocal = constructContainerRequests(
           Array(hostname),
           racks = null,
-          numWorkers,
+          numExecutors,
           priority)
 
         // Add `hostname` to the global (singleton) host->rack mapping in 
YarnAllocationHandler.
@@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
       }
       case AllocationType.RACK => {
         val rack = resource
-        constructContainerRequests(hosts = null, Array(rack), numWorkers, 
priority)
+        constructContainerRequests(hosts = null, Array(rack), numExecutors, 
priority)
       }
       case AllocationType.ANY => constructContainerRequests(
-        hosts = null, racks = null, numWorkers, priority)
+        hosts = null, racks = null, numExecutors, priority)
       case _ => throw new IllegalArgumentException(
         "Unexpected/unsupported request type: " + requestType)
     }
@@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
   private def constructContainerRequests(
       hosts: Array[String],
       racks: Array[String],
-      numWorkers: Int,
+      numExecutors: Int,
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
-    val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    val resource = Resource.newInstance(memoryRequest, workerCores)
+    val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val resource = Resource.newInstance(memoryRequest, executorCores)
 
     val prioritySetting = Records.newRecord(classOf[Priority])
     prioritySetting.setPriority(priority)
 
     val requests = new ArrayBuffer[ContainerRequest]()
-    for (i <- 0 until numWorkers) {
+    for (i <- 0 until numExecutors) {
       requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
     }
     requests
@@ -574,9 +574,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       Map[String, Int](),
       Map[String, Int](),
       sparkConf)
@@ -596,9 +596,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      args.numWorkers, 
-      args.workerMemory,
-      args.workerCores,
+      args.numExecutors, 
+      args.executorMemory,
+      args.executorCores,
       hostToSplitCount,
       rackToSplitCount,
       sparkConf)
@@ -608,9 +608,9 @@ object YarnAllocationHandler {
       conf: Configuration,
       amClient: AMRMClient[ContainerRequest],
       appAttemptId: ApplicationAttemptId,
-      maxWorkers: Int,
-      workerMemory: Int,
-      workerCores: Int,
+      maxExecutors: Int,
+      executorMemory: Int,
+      executorCores: Int,
       map: collection.Map[String, collection.Set[SplitInfo]],
       sparkConf: SparkConf
     ): YarnAllocationHandler = {
@@ -619,9 +619,9 @@ object YarnAllocationHandler {
       conf,
       amClient,
       appAttemptId,
-      maxWorkers,
-      workerMemory,
-      workerCores,
+      maxExecutors,
+      executorMemory,
+      executorCores,
       hostToCount,
       rackToCount,
       sparkConf)

Reply via email to