Repository: spark Updated Branches: refs/heads/master e4e8d8f39 -> 698373211
http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala deleted file mode 100644 index bfa8f84..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI -import java.nio.ByteBuffer -import java.security.PrivilegedExceptionAction - -import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} - -import org.apache.spark.{SparkConf, Logging} -import org.apache.hadoop.yarn.conf.YarnConfiguration - - -trait WorkerRunnableUtil extends Logging { - - val yarnConf: YarnConfiguration - val sparkConf: SparkConf - lazy val env = prepareEnvironment - - def prepareCommand( - masterAddress: String, - slaveId: String, - hostname: String, - workerMemory: Int, - workerCores: Int) = { - // Extra options for the JVM - var JAVA_OPTS = "" - // Set the JVM memory - val workerMemoryString = workerMemory + "m" - JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " " - if (env.isDefinedAt("SPARK_JAVA_OPTS")) { - JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " - } - - JAVA_OPTS += " -Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - // Commenting it out for now - so that people can refer to the properties if required. Remove - // it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence - // if there are multiple containers in same node, spark gc effects all other containers - // performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in - // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset - // of cores on a node. - /* - else { - // If no java_opts specified, default to using -XX:+CMSIncrementalMode - // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont - // want to mess with it. - // In our expts, using (default) throughput collector has severe perf ramnifications in - // multi-tennent machines - // The options are based on - // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline - JAVA_OPTS += " -XX:+UseConcMarkSweepGC " - JAVA_OPTS += " -XX:+CMSIncrementalMode " - JAVA_OPTS += " -XX:+CMSIncrementalPacing " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " - JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " - } - */ - - var javaCommand = "java" - val javaHome = System.getenv("JAVA_HOME") - if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { - javaCommand = Environment.JAVA_HOME.$() + "/bin/java" - } - - val commands = List[String](javaCommand + - " -server " + - // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in - // an inconsistent state. - // TODO: If the OOM is not recoverable by rescheduling it on different node, then do - // 'something' to fail job ... akin to blacklisting trackers in mapred ? - " -XX:OnOutOfMemoryError='kill %p' " + - JAVA_OPTS + - " org.apache.spark.executor.CoarseGrainedExecutorBackend " + - masterAddress + " " + - slaveId + " " + - hostname + " " + - workerCores + - " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - commands - } - - private def setupDistributedCache( - file: String, - rtype: LocalResourceType, - localResources: HashMap[String, LocalResource], - timestamp: String, - size: String, - vis: String) = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - localResources(uri.getFragment()) = amJarRsrc - } - - def prepareLocalResources: HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - val localResources = HashMap[String, LocalResource]() - - if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') - for( i <- 0 to distFiles.length - 1) { - setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), - fileSizes(i), visibilities(i)) - } - } - - if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') - val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') - for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, - timeStamps(i), fileSizes(i), visibilities(i)) - } - } - - logInfo("Prepared Local resources " + localResources) - localResources - } - - def prepareEnvironment: HashMap[String, String] = { - val env = new HashMap[String, String]() - - ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) - - // Allow users to specify some environment variables - Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) - - System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - env - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 522e0a9..6b91e6b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** * - * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. */ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { @@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { - // The yarn application is running, but the worker might not yet ready + // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e7130d2..d1f13e3 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend( "--class", "notused", "--jar", null, "--args", hostport, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" + "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) // process any optional arguments, use the defaults already defined in ClientArguments // if things aren't specified - Map("--master-memory" -> "SPARK_MASTER_MEMORY", - "--num-workers" -> "SPARK_WORKER_INSTANCES", - "--worker-memory" -> "SPARK_WORKER_MEMORY", - "--worker-cores" -> "SPARK_WORKER_CORES", - "--queue" -> "SPARK_YARN_QUEUE", - "--name" -> "SPARK_YARN_APP_NAME", - "--files" -> "SPARK_YARN_DIST_FILES", - "--archives" -> "SPARK_YARN_DIST_ARCHIVES") - .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + Map("SPARK_MASTER_MEMORY" -> "--driver-memory", + "SPARK_DRIVER_MEMORY" -> "--driver-memory", + "SPARK_WORKER_INSTANCES" -> "--num-executors", + "SPARK_WORKER_MEMORY" -> "--executor-memory", + "SPARK_WORKER_CORES" -> "--executor-cores", + "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", + "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", + "SPARK_EXECUTOR_CORES" -> "--executor-cores", + "SPARK_YARN_QUEUE" -> "--queue", + "SPARK_YARN_APP_NAME" -> "--name", + "SPARK_YARN_DIST_FILES" -> "--files", + "SPARK_YARN_DIST_ARCHIVES" -> "--archives") + .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) @@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend( def waitForApp() { - // TODO : need a better way to find out whether the workers are ready or not + // TODO : need a better way to find out whether the executors are ready or not // maybe by resource usage report? while(true) { val report = client.getApplicationReport(appId) http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 57d1577..30735cb 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ - // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3)) + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false @@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Call this to force generation of secret so it gets populated into the // hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the worker containers. + // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) // Start the user's JAR @@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } // Allocate all containers - allocateWorkers() + allocateExecutors() // Wait for the user class to Finish userThread.join() @@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, t } - // This need to happen before allocateWorkers() + // This need to happen before allocateExecutors() private def waitForSparkContextInitialized() { logInfo("Waiting for Spark context initialization") try { @@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } - private def allocateWorkers() { + private def allocateExecutors() { try { - logInfo("Allocating " + args.numWorkers + " workers.") + logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numWorkers) + yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. - while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) @@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } - logInfo("All workers have launched.") + logInfo("All executors have launched.") // Launch a progress reporter thread, else the app will get killed after expiration // (def: 10mins) timeout. @@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of worker failures reached") + "max number of executor failures reached") } - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { + if (missingExecutorCount > 0) { logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) } sendProgress() Thread.sleep(sleepTime) http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala new file mode 100644 index 0000000..b697f10 --- /dev/null +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.Socket +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import akka.actor._ +import akka.remote._ +import akka.actor.Terminated +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.SplitInfo +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest + +class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { + + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) + + def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) + + private var appAttemptId: ApplicationAttemptId = _ + private var reporterThread: Thread = _ + private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + + private var yarnAllocator: YarnAllocationHandler = _ + private var driverClosed:Boolean = false + + private var amClient: AMRMClient[ContainerRequest] = _ + + val securityManager = new SecurityManager(sparkConf) + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf, securityManager = securityManager)._1 + var actor: ActorRef = _ + + // This actor just working as a monitor to watch on Driver Actor. + class MonitorActor(driverUrl: String) extends Actor { + + var driver: ActorSelection = _ + + override def preStart() { + logInfo("Listen to driver: " + driverUrl) + driver = context.actorSelection(driverUrl) + // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. + driver ! "Hello" + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + } + + override def receive = { + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") + driverClosed = true + } + } + + def run() { + + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + + amClient = AMRMClient.createAMRMClient() + amClient.init(yarnConf) + amClient.start() + + appAttemptId = getApplicationAttemptId() + registerApplicationMaster() + + waitForSparkMaster() + + // Allocate all containers + allocateExecutors() + + // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout + // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + + val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + + reporterThread = launchReporterThread(interval) + + + // Wait for the reporter thread to Finish. + reporterThread.join() + + finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + actorSystem.shutdown() + + logInfo("Exited") + System.exit(0) + } + + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l + } + } + + private def getApplicationAttemptId(): ApplicationAttemptId = { + val envs = System.getenv() + val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) + val containerId = ConverterUtils.toContainerId(containerIdString) + val appAttemptId = containerId.getApplicationAttemptId() + logInfo("ApplicationAttemptId: " + appAttemptId) + appAttemptId + } + + private def registerApplicationMaster(): RegisterApplicationMasterResponse = { + logInfo("Registering the ApplicationMaster") + // TODO:(Raymond) Find out Spark UI address and fill in here? + amClient.registerApplicationMaster(Utils.localHostName(), 0, "") + } + + private def waitForSparkMaster() { + logInfo("Waiting for Spark driver to be reachable.") + var driverUp = false + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + while(!driverUp) { + try { + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at %s:%s, retrying ...". + format(driverHost, driverPort)) + Thread.sleep(100) + } + } + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) + + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) + + actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") + } + + + private def allocateExecutors() { + + // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map() + + yarnAllocator = YarnAllocationHandler.newAllocator( + yarnConf, + amClient, + appAttemptId, + args, + preferredNodeLocationData, + sparkConf) + + logInfo("Allocating " + args.numExecutors + " executors.") + // Wait until all containers have finished + // TODO: This is a bit ugly. Can we make it nicer? + // TODO: Handle container failure + + yarnAllocator.addResourceRequests(args.numExecutors) + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + yarnAllocator.allocateResources() + Thread.sleep(100) + } + + logInfo("All executors have launched.") + + } + + // TODO: We might want to extend this to allocate more containers in case they die ! + private def launchReporterThread(_sleepTime: Long): Thread = { + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime + + val t = new Thread { + override def run() { + while (!driverClosed) { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } + sendProgress() + Thread.sleep(sleepTime) + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.start() + logInfo("Started progress reporter thread - sleep time : " + sleepTime) + t + } + + private def sendProgress() { + logDebug("Sending progress") + // simulated with an allocate request with no nodes requested ... + yarnAllocator.allocateResources() + } + + def finishApplicationMaster(status: FinalApplicationStatus) { + logInfo("finish ApplicationMaster with " + status) + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) + } + +} + + +object ExecutorLauncher { + def main(argStrings: Array[String]) { + val args = new ApplicationMasterArguments(argStrings) + new ExecutorLauncher(args).run() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala new file mode 100644 index 0000000..53c403f --- /dev/null +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.client.api.NMClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.ipc.YarnRPC +import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} + +import org.apache.spark.{SparkConf, Logging} + + +class ExecutorRunnable( + container: Container, + conf: Configuration, + spConf: SparkConf, + masterAddress: String, + slaveId: String, + hostname: String, + executorMemory: Int, + executorCores: Int) + extends Runnable with ExecutorRunnableUtil with Logging { + + var rpc: YarnRPC = YarnRPC.create(conf) + var nmClient: NMClient = _ + val sparkConf = spConf + val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + + def run = { + logInfo("Starting Executor Container") + nmClient = NMClient.createNMClient() + nmClient.init(yarnConf) + nmClient.start() + startContainer + } + + def startContainer = { + logInfo("Setting up ContainerLaunchContext") + + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + + val localResources = prepareLocalResources + ctx.setLocalResources(localResources) + + ctx.setEnvironment(env) + + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + ctx.setTokens(ByteBuffer.wrap(dob.getData())) + + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores) + + logInfo("Setting up executor with commands: " + commands) + ctx.setCommands(commands) + + // Send the start request to the ContainerManager + nmClient.startContainer(container, ctx) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala deleted file mode 100644 index f1c1fea..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.Socket -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import akka.actor._ -import akka.remote._ -import akka.actor.Terminated -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} -import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.scheduler.SplitInfo -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest - -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) - extends Logging { - - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = - this(args, new Configuration(), sparkConf) - - def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) - - private var appAttemptId: ApplicationAttemptId = _ - private var reporterThread: Thread = _ - private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - - private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed:Boolean = false - - private var amClient: AMRMClient[ContainerRequest] = _ - - val securityManager = new SecurityManager(sparkConf) - val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf, securityManager = securityManager)._1 - var actor: ActorRef = _ - - // This actor just working as a monitor to watch on Driver Actor. - class MonitorActor(driverUrl: String) extends Actor { - - var driver: ActorSelection = _ - - override def preStart() { - logInfo("Listen to driver: " + driverUrl) - driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. - driver ! "Hello" - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - } - - override def receive = { - case x: DisassociatedEvent => - logInfo(s"Driver terminated or disconnected! Shutting down. $x") - driverClosed = true - } - } - - def run() { - - // Setup the directories so things go to yarn approved directories rather - // then user specified and /tmp. - System.setProperty("spark.local.dir", getLocalDirs()) - - amClient = AMRMClient.createAMRMClient() - amClient.init(yarnConf) - amClient.start() - - appAttemptId = getApplicationAttemptId() - registerApplicationMaster() - - waitForSparkMaster() - - // Allocate all containers - allocateWorkers() - - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. - - val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong - // must be <= timeoutInterval / 2. - val interval = math.min(timeoutInterval / 2, schedulerInterval) - - reporterThread = launchReporterThread(interval) - - - // Wait for the reporter thread to Finish. - reporterThread.join() - - finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) - actorSystem.shutdown() - - logInfo("Exited") - System.exit(0) - } - - /** Get the Yarn approved local directories. */ - private def getLocalDirs(): String = { - // Hadoop 0.23 and 2.x have different Environment variable names for the - // local dirs, so lets check both. We assume one of the 2 is set. - // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X - val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) - .orElse(Option(System.getenv("LOCAL_DIRS"))) - - localDirs match { - case None => throw new Exception("Yarn Local dirs can't be empty") - case Some(l) => l - } - } - - private def getApplicationAttemptId(): ApplicationAttemptId = { - val envs = System.getenv() - val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - logInfo("ApplicationAttemptId: " + appAttemptId) - appAttemptId - } - - private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") - // TODO:(Raymond) Find out Spark UI address and fill in here? - amClient.registerApplicationMaster(Utils.localHostName(), 0, "") - } - - private def waitForSparkMaster() { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - val hostport = args.userArgs(0) - val (driverHost, driverPort) = Utils.parseHostPort(hostport) - while(!driverUp) { - try { - val socket = new Socket(driverHost, driverPort) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => - logError("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - } - } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) - - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) - - actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") - } - - - private def allocateWorkers() { - - // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = - scala.collection.immutable.Map() - - yarnAllocator = YarnAllocationHandler.newAllocator( - yarnConf, - amClient, - appAttemptId, - args, - preferredNodeLocationData, - sparkConf) - - logInfo("Allocating " + args.numWorkers + " workers.") - // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure - - yarnAllocator.addResourceRequests(args.numWorkers) - while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) { - yarnAllocator.allocateResources() - Thread.sleep(100) - } - - logInfo("All workers have launched.") - - } - - // TODO: We might want to extend this to allocate more containers in case they die ! - private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime - - val t = new Thread { - override def run() { - while (!driverClosed) { - val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning - - yarnAllocator.getNumPendingAllocate - if (missingWorkerCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingWorkerCount)) - yarnAllocator.addResourceRequests(missingWorkerCount) - } - sendProgress() - Thread.sleep(sleepTime) - } - } - } - // setting to daemon status, though this is usually not a good idea. - t.setDaemon(true) - t.start() - logInfo("Started progress reporter thread - sleep time : " + sleepTime) - t - } - - private def sendProgress() { - logDebug("Sending progress") - // simulated with an allocate request with no nodes requested ... - yarnAllocator.allocateResources() - } - - def finishApplicationMaster(status: FinalApplicationStatus) { - logInfo("finish ApplicationMaster with " + status) - amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) - } - -} - - -object WorkerLauncher { - def main(argStrings: Array[String]) { - val args = new ApplicationMasterArguments(argStrings) - new WorkerLauncher(args).run() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala deleted file mode 100644 index ab4a79b..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.nio.ByteBuffer -import java.security.PrivilegedExceptionAction - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.client.api.NMClient -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} - -import org.apache.spark.{SparkConf, Logging} - - -class WorkerRunnable( - container: Container, - conf: Configuration, - spConf: SparkConf, - masterAddress: String, - slaveId: String, - hostname: String, - workerMemory: Int, - workerCores: Int) - extends Runnable with WorkerRunnableUtil with Logging { - - var rpc: YarnRPC = YarnRPC.create(conf) - var nmClient: NMClient = _ - val sparkConf = spConf - val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - - def run = { - logInfo("Starting Worker Container") - nmClient = NMClient.createNMClient() - nmClient.init(yarnConf) - nmClient.start() - startContainer - } - - def startContainer = { - logInfo("Setting up ContainerLaunchContext") - - val ctx = Records.newRecord(classOf[ContainerLaunchContext]) - .asInstanceOf[ContainerLaunchContext] - - val localResources = prepareLocalResources - ctx.setLocalResources(localResources) - - ctx.setEnvironment(env) - - val credentials = UserGroupInformation.getCurrentUser().getCredentials() - val dob = new DataOutputBuffer() - credentials.writeTokenStorageToStream(dob) - ctx.setTokens(ByteBuffer.wrap(dob.getData())) - - val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores) - - logInfo("Setting up worker with commands: " + commands) - ctx.setCommands(commands) - - // Send the start request to the ContainerManager - nmClient.startContainer(container, ctx) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/69837321/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 1ac6112..e31c406 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler( val conf: Configuration, val amClient: AMRMClient[ContainerRequest], val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, - val workerMemory: Int, - val workerCores: Int, + val maxExecutors: Int, + val executorMemory: Int, + val executorCores: Int, val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) @@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler( // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() - private val numWorkersRunning = new AtomicInteger() - // Used to generate a unique id per worker - private val workerIdCounter = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() - private val numWorkersFailed = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() def getNumPendingAllocate: Int = numPendingAllocate.intValue - def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumWorkersFailed: Int = numWorkersFailed.intValue + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } def releaseContainer(container: Container) { @@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler( logDebug(""" Allocated containers: %d - Current worker count: %d + Current executor count: %d Containers released: %s Containers to-be-released: %s Cluster resources: %s """.format( allocatedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers, allocateResponse.getAvailableResources)) @@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler( // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { - val numWorkersRunningNow = numWorkersRunning.incrementAndGet() - val workerHostname = container.getNodeId.getHost + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost val containerId = container.getId - val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) - assert(container.getResource.getMemory >= workerMemoryOverhead) + val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + assert(container.getResource.getMemory >= executorMemoryOverhead) - if (numWorkersRunningNow > maxWorkers) { + if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, workerHostname)) + containers for it.""".format(containerId, executorHostname)) releaseContainer(container) - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() } else { - val workerId = workerIdCounter.incrementAndGet().toString + val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) + logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) + val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId - allocatedContainerToHostMap.put(containerId, workerHostname) + allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } - logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname)) - val workerRunnable = new WorkerRunnable( + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname)) + val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, - workerId, - workerHostname, - workerMemory, - workerCores) - new Thread(workerRunnable).start() + executorId, + executorHostname, + executorMemory, + executorCores) + new Thread(executorRunnable).start() } } logDebug(""" Finished allocating %s containers (from %s originally). - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( allocatedContainersToProcess, allocatedContainers, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler( // `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) } else { - // Decrement the number of workers running. The next iteration of the ApplicationMaster's + // Decrement the number of executors running. The next iteration of the ApplicationMaster's // reporting thread will take care of allocating. - numWorkersRunning.decrementAndGet() + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId, completedContainer.getState, @@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler( // now I think its ok as none of the containers are expected to exit if (completedContainer.getExitStatus() != 0) { logInfo("Container marked as failed: " + containerId) - numWorkersFailed.incrementAndGet() + numExecutorsFailed.incrementAndGet() } } @@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler( } logDebug(""" Finished processing %d completed containers. - Current number of workers running: %d, + Current number of executors running: %d, releasedContainerList: %s, pendingReleaseContainers: %s """.format( completedContainers.size, - numWorkersRunning.get(), + numExecutorsRunning.get(), releasedContainerList, pendingReleaseContainers)) } @@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler( retval } - def addResourceRequests(numWorkers: Int) { + def addResourceRequests(numExecutors: Int) { val containerRequests: List[ContainerRequest] = - if (numWorkers <= 0 || preferredHostToCount.isEmpty) { - logDebug("numWorkers: " + numWorkers + ", host preferences: " + + if (numExecutors <= 0 || preferredHostToCount.isEmpty) { + logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY).toList } else { - // Request for all hosts in preferred nodes and for numWorkers - + // Request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { @@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests = createResourceRequests( AllocationType.ANY, resource = null, - numWorkers, + numExecutors, YarnAllocationHandler.PRIORITY) val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( @@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler( amClient.addContainerRequest(request) } - if (numWorkers > 0) { - numPendingAllocate.addAndGet(numWorkers) - logInfo("Will Allocate %d worker containers, each with %d memory".format( - numWorkers, - (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + if (numExecutors > 0) { + numPendingAllocate.addAndGet(numExecutors) + logInfo("Will Allocate %d executor containers, each with %d memory".format( + numExecutors, + (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) } else { logDebug("Empty allocation request ...") } @@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequests( requestType: AllocationType.AllocationType, resource: String, - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { @@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler( val nodeLocal = constructContainerRequests( Array(hostname), racks = null, - numWorkers, + numExecutors, priority) // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. @@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler( } case AllocationType.RACK => { val rack = resource - constructContainerRequests(hosts = null, Array(rack), numWorkers, priority) + constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) } case AllocationType.ANY => constructContainerRequests( - hosts = null, racks = null, numWorkers, priority) + hosts = null, racks = null, numExecutors, priority) case _ => throw new IllegalArgumentException( "Unexpected/unsupported request type: " + requestType) } @@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler( private def constructContainerRequests( hosts: Array[String], racks: Array[String], - numWorkers: Int, + numExecutors: Int, priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val resource = Resource.newInstance(memoryRequest, workerCores) + val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() - for (i <- 0 until numWorkers) { + for (i <- 0 until numExecutors) { requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests @@ -574,9 +574,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, Map[String, Int](), Map[String, Int](), sparkConf) @@ -596,9 +596,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - args.numWorkers, - args.workerMemory, - args.workerCores, + args.numExecutors, + args.executorMemory, + args.executorCores, hostToSplitCount, rackToSplitCount, sparkConf) @@ -608,9 +608,9 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - maxWorkers: Int, - workerMemory: Int, - workerCores: Int, + maxExecutors: Int, + executorMemory: Int, + executorCores: Int, map: collection.Map[String, collection.Set[SplitInfo]], sparkConf: SparkConf ): YarnAllocationHandler = { @@ -619,9 +619,9 @@ object YarnAllocationHandler { conf, amClient, appAttemptId, - maxWorkers, - workerMemory, - workerCores, + maxExecutors, + executorMemory, + executorCores, hostToCount, rackToCount, sparkConf)
