http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
deleted file mode 100644
index 8772e26..0000000
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.RackResolver
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config._
-
-private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], 
racks: Array[String])
-
-/**
- * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
- * the node ratio of pending tasks, number of required cores/containers and 
and locality of current
- * existing and pending allocated containers. The target of this algorithm is 
to maximize the number
- * of tasks that would run locally.
- *
- * Consider a situation in which we have 20 tasks that require (host1, host2, 
host3)
- * and 10 tasks that require (host1, host2, host4), besides each container has 
2 cores
- * and cpus per task is 1, so the required container number is 15,
- * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
- *
- * 1. If requested container number (18) is more than the required container 
number (15):
- *
- * requests for 5 containers with nodes: (host1, host2, host3, host4)
- * requests for 5 containers with nodes: (host1, host2, host3)
- * requests for 5 containers with nodes: (host1, host2)
- * requests for 3 containers with no locality preferences.
- *
- * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
- * preferences.
- *
- * 2. If requested container number (10) is less than or equal to the required 
container number
- * (15):
- *
- * requests for 4 containers with nodes: (host1, host2, host3, host4)
- * requests for 3 containers with nodes: (host1, host2, host3)
- * requests for 3 containers with nodes: (host1, host2)
- *
- * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 
: 1)
- *
- * 3. If containers exist but none of them can match the requested localities,
- * follow the method of 1 and 2.
- *
- * 4. If containers exist and some of them can match the requested localities.
- * For example if we have 1 containers on each node (host1: 1, host2: 1: 
host3: 1, host4: 1),
- * and the expected containers on each node would be (host1: 5, host2: 5, 
host3: 4, host4: 2),
- * so the newly requested containers on each node would be updated to (host1: 
4, host2: 4,
- * host3: 3, host4: 1), 12 containers by total.
- *
- *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
- *   method 1 with updated ratio 4 : 4 : 3 : 1.
- *
- *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
- *   method 2 with updated ratio 4 : 4 : 3 : 1.
- *
- * 5. If containers exist and existing localities can fully cover the 
requested localities.
- * For example if we have 5 containers on each node (host1: 5, host2: 5, 
host3: 5, host4: 5),
- * which could cover the current requested localities. This algorithm will 
allocate all the
- * requested containers with no localities.
- */
-private[yarn] class LocalityPreferredContainerPlacementStrategy(
-    val sparkConf: SparkConf,
-    val yarnConf: Configuration,
-    val resource: Resource) {
-
-  /**
-   * Calculate each container's node locality and rack locality
-   * @param numContainer number of containers to calculate
-   * @param numLocalityAwareTasks number of locality required tasks
-   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
-   *                             numbers running on it, used as hints for 
container allocation
-   * @param allocatedHostToContainersMap host to allocated containers map, 
used to calculate the
-   *                                     expected locality preference by 
considering the existing
-   *                                     containers
-   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
-   *                                          matches the localities of 
current required tasks.
-   * @return node localities and rack localities, each locality is an array of 
string,
-   *         the length of localities is the same as number of containers
-   */
-  def localityOfRequestedContainers(
-      numContainer: Int,
-      numLocalityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int],
-      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
-      localityMatchedPendingAllocations: Seq[ContainerRequest]
-    ): Array[ContainerLocalityPreferences] = {
-    val updatedHostToContainerCount = expectedHostToContainerCount(
-      numLocalityAwareTasks, hostToLocalTaskCount, 
allocatedHostToContainersMap,
-        localityMatchedPendingAllocations)
-    val updatedLocalityAwareContainerNum = 
updatedHostToContainerCount.values.sum
-
-    // The number of containers to allocate, divided into two groups, one with 
preferred locality,
-    // and the other without locality preference.
-    val requiredLocalityFreeContainerNum =
-      math.max(0, numContainer - updatedLocalityAwareContainerNum)
-    val requiredLocalityAwareContainerNum = numContainer - 
requiredLocalityFreeContainerNum
-
-    val containerLocalityPreferences = 
ArrayBuffer[ContainerLocalityPreferences]()
-    if (requiredLocalityFreeContainerNum > 0) {
-      for (i <- 0 until requiredLocalityFreeContainerNum) {
-        containerLocalityPreferences += ContainerLocalityPreferences(
-          null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
-      }
-    }
-
-    if (requiredLocalityAwareContainerNum > 0) {
-      val largestRatio = updatedHostToContainerCount.values.max
-      // Round the ratio of preferred locality to the number of locality 
required container
-      // number, which is used for locality preferred host calculating.
-      var preferredLocalityRatio = updatedHostToContainerCount.mapValues { 
ratio =>
-        val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum 
/ largestRatio
-        adjustedRatio.ceil.toInt
-      }
-
-      for (i <- 0 until requiredLocalityAwareContainerNum) {
-        // Only filter out the ratio which is larger than 0, which means the 
current host can
-        // still be allocated with new container request.
-        val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
-        val racks = hosts.map { h =>
-          RackResolver.resolve(yarnConf, h).getNetworkLocation
-        }.toSet
-        containerLocalityPreferences += ContainerLocalityPreferences(hosts, 
racks.toArray)
-
-        // Minus 1 each time when the host is used. When the current ratio is 
0,
-        // which means all the required ratio is satisfied, this host will not 
be allocated again.
-        preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
-      }
-    }
-
-    containerLocalityPreferences.toArray
-  }
-
-  /**
-   * Calculate the number of executors need to satisfy the given number of 
pending tasks.
-   */
-  private def numExecutorsPending(numTasksPending: Int): Int = {
-    val coresPerExecutor = resource.getVirtualCores
-    (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / 
coresPerExecutor
-  }
-
-  /**
-   * Calculate the expected host to number of containers by considering with 
allocated containers.
-   * @param localityAwareTasks number of locality aware tasks
-   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
-   *                             numbers running on it, used as hints for 
container allocation
-   * @param allocatedHostToContainersMap host to allocated containers map, 
used to calculate the
-   *                                     expected locality preference by 
considering the existing
-   *                                     containers
-   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
-   *                                          matches the localities of 
current required tasks.
-   * @return a map with hostname as key and required number of containers on 
this host as value
-   */
-  private def expectedHostToContainerCount(
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int],
-      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
-      localityMatchedPendingAllocations: Seq[ContainerRequest]
-    ): Map[String, Int] = {
-    val totalLocalTaskNum = hostToLocalTaskCount.values.sum
-    val pendingHostToContainersMap = 
pendingHostToContainerCount(localityMatchedPendingAllocations)
-
-    hostToLocalTaskCount.map { case (host, count) =>
-      val expectedCount =
-        count.toDouble * numExecutorsPending(localityAwareTasks) / 
totalLocalTaskNum
-      // Take the locality of pending containers into consideration
-      val existedCount = 
allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
-        pendingHostToContainersMap.getOrElse(host, 0.0)
-
-      // If existing container can not fully satisfy the expected number of 
container,
-      // the required container number is expected count minus existed count. 
Otherwise the
-      // required container number is 0.
-      (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
-    }
-  }
-
-  /**
-   * According to the locality ratio and number of container requests, 
calculate the host to
-   * possible number of containers for pending allocated containers.
-   *
-   * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 
10,
-   * and pending container requests is 3, so the possible number of containers 
on
-   * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6.
-   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
-   *                                          matches the localities of 
current required tasks.
-   * @return a Map with hostname as key and possible number of containers on 
this host as value
-   */
-  private def pendingHostToContainerCount(
-      localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, 
Double] = {
-    val pendingHostToContainerCount = new HashMap[String, Int]()
-    localityMatchedPendingAllocations.foreach { cr =>
-      cr.getNodes.asScala.foreach { n =>
-        val count = pendingHostToContainerCount.getOrElse(n, 0) + 1
-        pendingHostToContainerCount(n) = count
-      }
-    }
-
-    val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
-    val localityMatchedPendingNum = 
localityMatchedPendingAllocations.size.toDouble
-    pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / 
possibleTotalContainerNum)
-      .toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
deleted file mode 100644
index 0b66d1c..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ /dev/null
@@ -1,727 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.Collections
-import java.util.concurrent._
-import java.util.regex.Pattern
-
-import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
-
-/**
- * YarnAllocator is charged with requesting containers from the YARN 
ResourceManager and deciding
- * what to do with containers when YARN fulfills these requests.
- *
- * This class makes use of YARN's AMRMClient APIs. We interact with the 
AMRMClient in three ways:
- * * Making our resource needs known, which updates local bookkeeping about 
containers requested.
- * * Calling "allocate", which syncs our local container requests with the RM, 
and returns any
- *   containers that YARN has granted to us.  This also functions as a 
heartbeat.
- * * Processing the containers granted to us to possibly launch executors 
inside of them.
- *
- * The public methods of this class are thread-safe.  All methods that mutate 
state are
- * synchronized.
- */
-private[yarn] class YarnAllocator(
-    driverUrl: String,
-    driverRef: RpcEndpointRef,
-    conf: YarnConfiguration,
-    sparkConf: SparkConf,
-    amClient: AMRMClient[ContainerRequest],
-    appAttemptId: ApplicationAttemptId,
-    securityMgr: SecurityManager,
-    localResources: Map[String, LocalResource])
-  extends Logging {
-
-  import YarnAllocator._
-
-  // RackResolver logs an INFO message whenever it resolves a rack, which is 
way too often.
-  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
-    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
-  }
-
-  // Visible for testing.
-  val allocatedHostToContainersMap = new HashMap[String, 
collection.mutable.Set[ContainerId]]
-  val allocatedContainerToHostMap = new HashMap[ContainerId, String]
-
-  // Containers that we no longer care about. We've either already told the RM 
to release them or
-  // will on the next heartbeat. Containers get removed from this map after 
the RM tells us they've
-  // completed.
-  private val releasedContainers = Collections.newSetFromMap[ContainerId](
-    new ConcurrentHashMap[ContainerId, java.lang.Boolean])
-
-  @volatile private var numExecutorsRunning = 0
-
-  /**
-   * Used to generate a unique ID per executor
-   *
-   * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset 
to 0. Then
-   * the id of new executor will start from 1, this will conflict with the 
executor has
-   * already created before. So, we should initialize the `executorIdCounter` 
by getting
-   * the max executorId from driver.
-   *
-   * And this situation of executorId conflict is just in yarn client mode, so 
this is an issue
-   * in yarn client mode. For more details, can check in jira.
-   *
-   * @see SPARK-12864
-   */
-  private var executorIdCounter: Int =
-    driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
-
-  // Queue to store the timestamp of failed executors
-  private val failedExecutorsTimeStamps = new Queue[Long]()
-
-  private var clock: Clock = new SystemClock
-
-  private val executorFailuresValidityInterval =
-    sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
-
-  @volatile private var targetNumExecutors =
-    YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
-
-  // Executor loss reason requests that are pending - maps from executor ID 
for inquiry to a
-  // list of requesters that should be responded to once we find out why the 
given executor
-  // was lost.
-  private val pendingLossReasonRequests = new HashMap[String, 
mutable.Buffer[RpcCallContext]]
-
-  // Maintain loss reasons for already released executors, it will be added 
when executor loss
-  // reason is got from AM-RM call, and be removed after querying this loss 
reason.
-  private val releasedExecutorLossReasons = new HashMap[String, 
ExecutorLossReason]
-
-  // Keep track of which container is running which executor to remove the 
executors later
-  // Visible for testing.
-  private[yarn] val executorIdToContainer = new HashMap[String, Container]
-
-  private var numUnexpectedContainerRelease = 0L
-  private val containerIdToExecutorId = new HashMap[ContainerId, String]
-
-  // Executor memory in MB.
-  protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
-  // Additional memory overhead.
-  protected val memoryOverhead: Int = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, 
MEMORY_OVERHEAD_MIN)).toInt
-  // Number of cores per executor.
-  protected val executorCores = sparkConf.get(EXECUTOR_CORES)
-  // Resource capability requested for each executors
-  private[yarn] val resource = Resource.newInstance(executorMemory + 
memoryOverhead, executorCores)
-
-  private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
-    "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
-
-  // For testing
-  private val launchContainers = 
sparkConf.getBoolean("spark.yarn.launchContainers", true)
-
-  private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
-
-  // ContainerRequest constructor that can take a node label expression. We 
grab it through
-  // reflection because it's only available in later versions of YARN.
-  private val nodeLabelConstructor = labelExpression.flatMap { expr =>
-    try {
-      Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
-        classOf[Array[String]], classOf[Array[String]], classOf[Priority], 
classOf[Boolean],
-        classOf[String]))
-    } catch {
-      case e: NoSuchMethodException =>
-        logWarning(s"Node label expression $expr will be ignored because YARN 
version on" +
-          " classpath does not support it.")
-        None
-    }
-  }
-
-  // A map to store preferred hostname and possible task numbers running on it.
-  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
-  // Number of tasks that have locality preferences in active stages
-  private var numLocalityAwareTasks: Int = 0
-
-  // A container placement strategy based on pending tasks' locality preference
-  private[yarn] val containerPlacementStrategy =
-    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
-
-  /**
-   * Use a different clock for YarnAllocator. This is mainly used for testing.
-   */
-  def setClock(newClock: Clock): Unit = {
-    clock = newClock
-  }
-
-  def getNumExecutorsRunning: Int = numExecutorsRunning
-
-  def getNumExecutorsFailed: Int = synchronized {
-    val endTime = clock.getTimeMillis()
-
-    while (executorFailuresValidityInterval > 0
-      && failedExecutorsTimeStamps.nonEmpty
-      && failedExecutorsTimeStamps.head < endTime - 
executorFailuresValidityInterval) {
-      failedExecutorsTimeStamps.dequeue()
-    }
-
-    failedExecutorsTimeStamps.size
-  }
-
-  /**
-   * A sequence of pending container requests that have not yet been fulfilled.
-   */
-  def getPendingAllocate: Seq[ContainerRequest] = 
getPendingAtLocation(ANY_HOST)
-
-  /**
-   * A sequence of pending container requests at the given location that have 
not yet been
-   * fulfilled.
-   */
-  private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
-    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).asScala
-      .flatMap(_.asScala)
-      .toSeq
-  }
-
-  /**
-   * Request as many executors from the ResourceManager as needed to reach the 
desired total. If
-   * the requested total is smaller than the current number of running 
executors, no executors will
-   * be killed.
-   * @param requestedTotal total number of containers requested
-   * @param localityAwareTasks number of locality aware tasks to be used as 
container placement hint
-   * @param hostToLocalTaskCount a map of preferred hostname to possible task 
counts to be used as
-   *                             container placement hint.
-   * @return Whether the new requested total is different than the old value.
-   */
-  def requestTotalExecutorsWithPreferredLocalities(
-      requestedTotal: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
-    this.numLocalityAwareTasks = localityAwareTasks
-    this.hostToLocalTaskCounts = hostToLocalTaskCount
-
-    if (requestedTotal != targetNumExecutors) {
-      logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
-      targetNumExecutors = requestedTotal
-      true
-    } else {
-      false
-    }
-  }
-
-  /**
-   * Request that the ResourceManager release the container running the 
specified executor.
-   */
-  def killExecutor(executorId: String): Unit = synchronized {
-    if (executorIdToContainer.contains(executorId)) {
-      val container = executorIdToContainer.get(executorId).get
-      internalReleaseContainer(container)
-      numExecutorsRunning -= 1
-    } else {
-      logWarning(s"Attempted to kill unknown executor $executorId!")
-    }
-  }
-
-  /**
-   * Request resources such that, if YARN gives us all we ask for, we'll have 
a number of containers
-   * equal to maxExecutors.
-   *
-   * Deal with any containers YARN has granted to us by possibly launching 
executors in them.
-   *
-   * This must be synchronized because variables read in this method are 
mutated by other methods.
-   */
-  def allocateResources(): Unit = synchronized {
-    updateResourceRequests()
-
-    val progressIndicator = 0.1f
-    // Poll the ResourceManager. This doubles as a heartbeat if there are no 
pending container
-    // requests.
-    val allocateResponse = amClient.allocate(progressIndicator)
-
-    val allocatedContainers = allocateResponse.getAllocatedContainers()
-
-    if (allocatedContainers.size > 0) {
-      logDebug("Allocated containers: %d. Current executor count: %d. Cluster 
resources: %s."
-        .format(
-          allocatedContainers.size,
-          numExecutorsRunning,
-          allocateResponse.getAvailableResources))
-
-      handleAllocatedContainers(allocatedContainers.asScala)
-    }
-
-    val completedContainers = allocateResponse.getCompletedContainersStatuses()
-    if (completedContainers.size > 0) {
-      logDebug("Completed %d containers".format(completedContainers.size))
-      processCompletedContainers(completedContainers.asScala)
-      logDebug("Finished processing %d completed containers. Current running 
executor count: %d."
-        .format(completedContainers.size, numExecutorsRunning))
-    }
-  }
-
-  /**
-   * Update the set of container requests that we will sync with the RM based 
on the number of
-   * executors we have currently running and our target number of executors.
-   *
-   * Visible for testing.
-   */
-  def updateResourceRequests(): Unit = {
-    val pendingAllocate = getPendingAllocate
-    val numPendingAllocate = pendingAllocate.size
-    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
-
-    if (missing > 0) {
-      logInfo(s"Will request $missing executor container(s), each with " +
-        s"${resource.getVirtualCores} core(s) and " +
-        s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
-
-      // Split the pending container request into three groups: locality 
matched list, locality
-      // unmatched list and non-locality list. Take the locality matched 
container request into
-      // consideration of container placement, treat as allocated containers.
-      // For locality unmatched and locality free container requests, cancel 
these container
-      // requests, since required locality preference has been changed, 
recalculating using
-      // container placement strategy.
-      val (localRequests, staleRequests, anyHostRequests) = 
splitPendingAllocationsByLocality(
-        hostToLocalTaskCounts, pendingAllocate)
-
-      // cancel "stale" requests for locations that are no longer needed
-      staleRequests.foreach { stale =>
-        amClient.removeContainerRequest(stale)
-      }
-      val cancelledContainers = staleRequests.size
-      if (cancelledContainers > 0) {
-        logInfo(s"Canceled $cancelledContainers container request(s) (locality 
no longer needed)")
-      }
-
-      // consider the number of new containers and cancelled stale containers 
available
-      val availableContainers = missing + cancelledContainers
-
-      // to maximize locality, include requests with no locality preference 
that can be cancelled
-      val potentialContainers = availableContainers + anyHostRequests.size
-
-      val containerLocalityPreferences = 
containerPlacementStrategy.localityOfRequestedContainers(
-        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
-          allocatedHostToContainersMap, localRequests)
-
-      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
-      containerLocalityPreferences.foreach {
-        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
-          newLocalityRequests += createContainerRequest(resource, nodes, racks)
-        case _ =>
-      }
-
-      if (availableContainers >= newLocalityRequests.size) {
-        // more containers are available than needed for locality, fill in 
requests for any host
-        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
-          newLocalityRequests += createContainerRequest(resource, null, null)
-        }
-      } else {
-        val numToCancel = newLocalityRequests.size - availableContainers
-        // cancel some requests without locality preferences to schedule more 
local containers
-        anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
-          amClient.removeContainerRequest(nonLocal)
-        }
-        if (numToCancel > 0) {
-          logInfo(s"Canceled $numToCancel unlocalized container requests to 
resubmit with locality")
-        }
-      }
-
-      newLocalityRequests.foreach { request =>
-        amClient.addContainerRequest(request)
-      }
-
-      if (log.isInfoEnabled()) {
-        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() 
!= null)
-        if (anyHost.nonEmpty) {
-          logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
-        }
-        localized.foreach { request =>
-          logInfo(s"Submitted container request for host ${hostStr(request)}.")
-        }
-      }
-    } else if (numPendingAllocate > 0 && missing < 0) {
-      val numToCancel = math.min(numPendingAllocate, -missing)
-      logInfo(s"Canceling requests for $numToCancel executor container(s) to 
have a new desired " +
-        s"total $targetNumExecutors executors.")
-
-      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, 
ANY_HOST, resource)
-      if (!matchingRequests.isEmpty) {
-        matchingRequests.iterator().next().asScala
-          .take(numToCancel).foreach(amClient.removeContainerRequest)
-      } else {
-        logWarning("Expected to find pending requests, but found none.")
-      }
-    }
-  }
-
-  private def hostStr(request: ContainerRequest): String = {
-    Option(request.getNodes) match {
-      case Some(nodes) => nodes.asScala.mkString(",")
-      case None => "Any"
-    }
-  }
-
-  /**
-   * Creates a container request, handling the reflection required to use YARN 
features that were
-   * added in recent versions.
-   */
-  private def createContainerRequest(
-      resource: Resource,
-      nodes: Array[String],
-      racks: Array[String]): ContainerRequest = {
-    nodeLabelConstructor.map { constructor =>
-      constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, 
true: java.lang.Boolean,
-        labelExpression.orNull)
-    }.getOrElse(new ContainerRequest(resource, nodes, racks, 
RM_REQUEST_PRIORITY))
-  }
-
-  /**
-   * Handle containers granted by the RM by launching executors on them.
-   *
-   * Due to the way the YARN allocation protocol works, certain healthy race 
conditions can result
-   * in YARN granting containers that we no longer need. In this case, we 
release them.
-   *
-   * Visible for testing.
-   */
-  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
-    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
-
-    // Match incoming requests by host
-    val remainingAfterHostMatches = new ArrayBuffer[Container]
-    for (allocatedContainer <- allocatedContainers) {
-      matchContainerToRequest(allocatedContainer, 
allocatedContainer.getNodeId.getHost,
-        containersToUse, remainingAfterHostMatches)
-    }
-
-    // Match remaining by rack
-    val remainingAfterRackMatches = new ArrayBuffer[Container]
-    for (allocatedContainer <- remainingAfterHostMatches) {
-      val rack = RackResolver.resolve(conf, 
allocatedContainer.getNodeId.getHost).getNetworkLocation
-      matchContainerToRequest(allocatedContainer, rack, containersToUse,
-        remainingAfterRackMatches)
-    }
-
-    // Assign remaining that are neither node-local nor rack-local
-    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
-    for (allocatedContainer <- remainingAfterRackMatches) {
-      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
-        remainingAfterOffRackMatches)
-    }
-
-    if (!remainingAfterOffRackMatches.isEmpty) {
-      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded 
containers that were " +
-        s"allocated to us")
-      for (container <- remainingAfterOffRackMatches) {
-        internalReleaseContainer(container)
-      }
-    }
-
-    runAllocatedContainers(containersToUse)
-
-    logInfo("Received %d containers from YARN, launching executors on %d of 
them."
-      .format(allocatedContainers.size, containersToUse.size))
-  }
-
-  /**
-   * Looks for requests for the given location that match the given container 
allocation. If it
-   * finds one, removes the request so that it won't be submitted again. 
Places the container into
-   * containersToUse or remaining.
-   *
-   * @param allocatedContainer container that was given to us by YARN
-   * @param location resource name, either a node, rack, or *
-   * @param containersToUse list of containers that will be used
-   * @param remaining list of containers that will not be used
-   */
-  private def matchContainerToRequest(
-      allocatedContainer: Container,
-      location: String,
-      containersToUse: ArrayBuffer[Container],
-      remaining: ArrayBuffer[Container]): Unit = {
-    // SPARK-6050: certain Yarn configurations return a virtual core count 
that doesn't match the
-    // request; for example, capacity scheduler + DefaultResourceCalculator. 
So match on requested
-    // memory, but use the asked vcore count for matching, effectively 
disabling matching on vcore
-    // count.
-    val matchingResource = 
Resource.newInstance(allocatedContainer.getResource.getMemory,
-          resource.getVirtualCores)
-    val matchingRequests = 
amClient.getMatchingRequests(allocatedContainer.getPriority, location,
-      matchingResource)
-
-    // Match the allocation to a request
-    if (!matchingRequests.isEmpty) {
-      val containerRequest = matchingRequests.get(0).iterator.next
-      amClient.removeContainerRequest(containerRequest)
-      containersToUse += allocatedContainer
-    } else {
-      remaining += allocatedContainer
-    }
-  }
-
-  /**
-   * Launches executors in the allocated containers.
-   */
-  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
-    for (container <- containersToUse) {
-      executorIdCounter += 1
-      val executorHostname = container.getNodeId.getHost
-      val containerId = container.getId
-      val executorId = executorIdCounter.toString
-      assert(container.getResource.getMemory >= resource.getMemory)
-      logInfo(s"Launching container $containerId on host $executorHostname")
-
-      def updateInternalState(): Unit = synchronized {
-        numExecutorsRunning += 1
-        executorIdToContainer(executorId) = container
-        containerIdToExecutorId(container.getId) = executorId
-
-        val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-          new HashSet[ContainerId])
-        containerSet += containerId
-        allocatedContainerToHostMap.put(containerId, executorHostname)
-      }
-
-      if (numExecutorsRunning < targetNumExecutors) {
-        if (launchContainers) {
-          launcherPool.execute(new Runnable {
-            override def run(): Unit = {
-              try {
-                new ExecutorRunnable(
-                  Some(container),
-                  conf,
-                  sparkConf,
-                  driverUrl,
-                  executorId,
-                  executorHostname,
-                  executorMemory,
-                  executorCores,
-                  appAttemptId.getApplicationId.toString,
-                  securityMgr,
-                  localResources
-                ).run()
-                updateInternalState()
-              } catch {
-                case NonFatal(e) =>
-                  logError(s"Failed to launch executor $executorId on 
container $containerId", e)
-                  // Assigned container should be released immediately to 
avoid unnecessary resource
-                  // occupation.
-                  amClient.releaseAssignedContainer(containerId)
-              }
-            }
-          })
-        } else {
-          // For test only
-          updateInternalState()
-        }
-      } else {
-        logInfo(("Skip launching executorRunnable as runnning Excecutors 
count: %d " +
-          "reached target Executors count: %d.").format(numExecutorsRunning, 
targetNumExecutors))
-      }
-    }
-  }
-
-  // Visible for testing.
-  private[yarn] def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
-    for (completedContainer <- completedContainers) {
-      val containerId = completedContainer.getContainerId
-      val alreadyReleased = releasedContainers.remove(containerId)
-      val hostOpt = allocatedContainerToHostMap.get(containerId)
-      val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
-      val exitReason = if (!alreadyReleased) {
-        // Decrement the number of executors running. The next iteration of
-        // the ApplicationMaster's reporting thread will take care of 
allocating.
-        numExecutorsRunning -= 1
-        logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
-          containerId,
-          onHostStr,
-          completedContainer.getState,
-          completedContainer.getExitStatus))
-        // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-        // there are some exit status' we shouldn't necessarily count against 
us, but for
-        // now I think its ok as none of the containers are expected to exit.
-        val exitStatus = completedContainer.getExitStatus
-        val (exitCausedByApp, containerExitReason) = exitStatus match {
-          case ContainerExitStatus.SUCCESS =>
-            (false, s"Executor for container $containerId exited because of a 
YARN event (e.g., " +
-              "pre-emption) and not because of an error in the running job.")
-          case ContainerExitStatus.PREEMPTED =>
-            // Preemption is not the fault of the running tasks, since YARN 
preempts containers
-            // merely to do resource sharing, and tasks that fail due to 
preempted executors could
-            // just as easily finish on any other executor. See SPARK-8167.
-            (false, s"Container ${containerId}${onHostStr} was preempted.")
-          // Should probably still count memory exceeded exit codes towards 
task failures
-          case VMEM_EXCEEDED_EXIT_CODE =>
-            (true, memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              VMEM_EXCEEDED_PATTERN))
-          case PMEM_EXCEEDED_EXIT_CODE =>
-            (true, memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              PMEM_EXCEEDED_PATTERN))
-          case _ =>
-            // Enqueue the timestamp of failed executor
-            failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
-            (true, "Container marked as failed: " + containerId + onHostStr +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
-
-        }
-        if (exitCausedByApp) {
-          logWarning(containerExitReason)
-        } else {
-          logInfo(containerExitReason)
-        }
-        ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
-      } else {
-        // If we have already released this container, then it must mean
-        // that the driver has explicitly requested it to be killed
-        ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = 
false,
-          s"Container $containerId exited from explicit termination request.")
-      }
-
-      for {
-        host <- hostOpt
-        containerSet <- allocatedHostToContainersMap.get(host)
-      } {
-        containerSet.remove(containerId)
-        if (containerSet.isEmpty) {
-          allocatedHostToContainersMap.remove(host)
-        } else {
-          allocatedHostToContainersMap.update(host, containerSet)
-        }
-
-        allocatedContainerToHostMap.remove(containerId)
-      }
-
-      containerIdToExecutorId.remove(containerId).foreach { eid =>
-        executorIdToContainer.remove(eid)
-        pendingLossReasonRequests.remove(eid) match {
-          case Some(pendingRequests) =>
-            // Notify application of executor loss reasons so it can decide 
whether it should abort
-            pendingRequests.foreach(_.reply(exitReason))
-
-          case None =>
-            // We cannot find executor for pending reasons. This is because 
completed container
-            // is processed before querying pending result. We should store it 
for later query.
-            // This is usually happened when explicitly killing a container, 
the result will be
-            // returned in one AM-RM communication. So query RPC will be later 
than this completed
-            // container process.
-            releasedExecutorLossReasons.put(eid, exitReason)
-        }
-        if (!alreadyReleased) {
-          // The executor could have gone away (like no route to host, node 
failure, etc)
-          // Notify backend about the failure of the executor
-          numUnexpectedContainerRelease += 1
-          driverRef.send(RemoveExecutor(eid, exitReason))
-        }
-      }
-    }
-  }
-
-  /**
-   * Register that some RpcCallContext has asked the AM why the executor was 
lost. Note that
-   * we can only find the loss reason to send back in the next call to 
allocateResources().
-   */
-  private[yarn] def enqueueGetLossReasonRequest(
-      eid: String,
-      context: RpcCallContext): Unit = synchronized {
-    if (executorIdToContainer.contains(eid)) {
-      pendingLossReasonRequests
-        .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
-    } else if (releasedExecutorLossReasons.contains(eid)) {
-      // Executor is already released explicitly before getting the loss 
reason, so directly send
-      // the pre-stored lost reason
-      context.reply(releasedExecutorLossReasons.remove(eid).get)
-    } else {
-      logWarning(s"Tried to get the loss reason for non-existent executor 
$eid")
-      context.sendFailure(
-        new SparkException(s"Fail to find loss reason for non-existent 
executor $eid"))
-    }
-  }
-
-  private def internalReleaseContainer(container: Container): Unit = {
-    releasedContainers.add(container.getId())
-    amClient.releaseAssignedContainer(container.getId())
-  }
-
-  private[yarn] def getNumUnexpectedContainerRelease = 
numUnexpectedContainerRelease
-
-  private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
-    pendingLossReasonRequests.size
-  }
-
-  /**
-   * Split the pending container requests into 3 groups based on current 
localities of pending
-   * tasks.
-   * @param hostToLocalTaskCount a map of preferred hostname to possible task 
counts to be used as
-   *                             container placement hint.
-   * @param pendingAllocations A sequence of pending allocation container 
request.
-   * @return A tuple of 3 sequences, first is a sequence of locality matched 
container
-   *         requests, second is a sequence of locality unmatched container 
requests, and third is a
-   *         sequence of locality free container requests.
-   */
-  private def splitPendingAllocationsByLocality(
-      hostToLocalTaskCount: Map[String, Int],
-      pendingAllocations: Seq[ContainerRequest]
-    ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = 
{
-    val localityMatched = ArrayBuffer[ContainerRequest]()
-    val localityUnMatched = ArrayBuffer[ContainerRequest]()
-    val localityFree = ArrayBuffer[ContainerRequest]()
-
-    val preferredHosts = hostToLocalTaskCount.keySet
-    pendingAllocations.foreach { cr =>
-      val nodes = cr.getNodes
-      if (nodes == null) {
-        localityFree += cr
-      } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
-        localityMatched += cr
-      } else {
-        localityUnMatched += cr
-      }
-    }
-
-    (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
-  }
-
-}
-
-private object YarnAllocator {
-  val MEM_REGEX = "[0-9.]+ [KMG]B"
-  val PMEM_EXCEEDED_PATTERN =
-    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
-  val VMEM_EXCEEDED_PATTERN =
-    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
-  val VMEM_EXCEEDED_EXIT_CODE = -103
-  val PMEM_EXCEEDED_EXIT_CODE = -104
-
-  def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): 
String = {
-    val matcher = pattern.matcher(diagnostics)
-    val diag = if (matcher.find()) " " + matcher.group() + "." else ""
-    ("Container killed by YARN for exceeding memory limits." + diag
-      + " Consider boosting spark.yarn.executor.memoryOverhead.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
deleted file mode 100644
index 53df11e..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.util.Utils
-
-/**
- * Handles registering and unregistering the application with the YARN 
ResourceManager.
- */
-private[spark] class YarnRMClient extends Logging {
-
-  private var amClient: AMRMClient[ContainerRequest] = _
-  private var uiHistoryAddress: String = _
-  private var registered: Boolean = false
-
-  /**
-   * Registers the application master with the RM.
-   *
-   * @param conf The Yarn configuration.
-   * @param sparkConf The Spark configuration.
-   * @param uiAddress Address of the SparkUI.
-   * @param uiHistoryAddress Address of the application on the History Server.
-   * @param securityMgr The security manager.
-   * @param localResources Map with information about files distributed via 
YARN's cache.
-   */
-  def register(
-      driverUrl: String,
-      driverRef: RpcEndpointRef,
-      conf: YarnConfiguration,
-      sparkConf: SparkConf,
-      uiAddress: String,
-      uiHistoryAddress: String,
-      securityMgr: SecurityManager,
-      localResources: Map[String, LocalResource]
-    ): YarnAllocator = {
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(conf)
-    amClient.start()
-    this.uiHistoryAddress = uiHistoryAddress
-
-    logInfo("Registering the ApplicationMaster")
-    synchronized {
-      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
-      registered = true
-    }
-    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, 
getAttemptId(), securityMgr,
-      localResources)
-  }
-
-  /**
-   * Unregister the AM. Guaranteed to only be called once.
-   *
-   * @param status The final status of the AM.
-   * @param diagnostics Diagnostics message to include in the final status.
-   */
-  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): 
Unit = synchronized {
-    if (registered) {
-      amClient.unregisterApplicationMaster(status, diagnostics, 
uiHistoryAddress)
-    }
-  }
-
-  /** Returns the attempt ID. */
-  def getAttemptId(): ApplicationAttemptId = {
-    YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
-  }
-
-  /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
-  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): 
Map[String, String] = {
-    // Figure out which scheme Yarn is using. Note the method seems to have 
been added after 2.2,
-    // so not all stable releases have it.
-    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", 
classOf[Configuration])
-      .invoke(null, conf).asInstanceOf[String]).getOrElse("http://";)
-
-    // If running a new enough Yarn, use the HA-aware API for retrieving the 
RM addresses.
-    try {
-      val method = 
classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
-        classOf[Configuration])
-      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
-      val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
-      val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase 
}
-      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> 
uriBases.mkString(","))
-    } catch {
-      case e: NoSuchMethodException =>
-        val proxy = WebAppUtils.getProxyHostAndPort(conf)
-        val parts = proxy.split(":")
-        val uriBase = prefix + proxy + proxyBase
-        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
-    }
-  }
-
-  /** Returns the maximum number of attempts to register the AM. */
-  def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): 
Int = {
-    val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
-    val yarnMaxAttempts = yarnConf.getInt(
-      YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-    val retval: Int = sparkMaxAttempts match {
-      case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
-      case None => yarnMaxAttempts
-    }
-
-    retval
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index cc53b1b..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.regex.Matcher
-import java.util.regex.Pattern
-
-import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, 
Priority}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, 
CredentialUpdater}
-import org.apache.spark.internal.config._
-import org.apache.spark.launcher.YarnCommandBuilderUtils
-import org.apache.spark.util.Utils
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
-  private var credentialUpdater: CredentialUpdater = _
-
-  override def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
-    dest.addCredentials(source.getCredentials())
-  }
-
-  // Note that all params which start with SPARK are propagated all the way 
through, so if in yarn
-  // mode, this MUST be set to true.
-  override def isYarnMode(): Boolean = { true }
-
-  // Return an appropriate (subclass) of Configuration. Creating a config 
initializes some Hadoop
-  // subsystems. Always create a new config, don't reuse yarnConf.
-  override def newConfiguration(conf: SparkConf): Configuration =
-    new YarnConfiguration(super.newConfiguration(conf))
-
-  // Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
-  // cluster
-  override def addCredentials(conf: JobConf) {
-    val jobCreds = conf.getCredentials()
-    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
-  }
-
-  override def getCurrentUserCredentials(): Credentials = {
-    UserGroupInformation.getCurrentUser().getCredentials()
-  }
-
-  override def addCurrentUserCredentials(creds: Credentials) {
-    UserGroupInformation.getCurrentUser().addCredentials(creds)
-  }
-
-  override def addSecretKeyToUserCredentials(key: String, secret: String) {
-    val creds = new Credentials()
-    creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
-    addCurrentUserCredentials(creds)
-  }
-
-  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
-    val credentials = getCurrentUserCredentials()
-    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
-  }
-
-  private[spark] override def startCredentialUpdater(sparkConf: SparkConf): 
Unit = {
-    credentialUpdater =
-      new ConfigurableCredentialManager(sparkConf, 
newConfiguration(sparkConf)).credentialUpdater()
-    credentialUpdater.start()
-  }
-
-  private[spark] override def stopCredentialUpdater(): Unit = {
-    if (credentialUpdater != null) {
-      credentialUpdater.stop()
-      credentialUpdater = null
-    }
-  }
-
-  private[spark] def getContainerId: ContainerId = {
-    val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
-    ConverterUtils.toContainerId(containerIdString)
-  }
-}
-
-object YarnSparkHadoopUtil {
-  // Additional memory overhead
-  // 10% was arrived at experimentally. In the interest of minimizing memory 
waste while covering
-  // the common cases. Memory overhead tends to grow with container size.
-
-  val MEMORY_OVERHEAD_FACTOR = 0.10
-  val MEMORY_OVERHEAD_MIN = 384L
-
-  val ANY_HOST = "*"
-
-  val DEFAULT_NUMBER_EXECUTORS = 2
-
-  // All RM requests are issued with same priority : we do not (yet) have any 
distinction between
-  // request types (like map/reduce in hadoop for example)
-  val RM_REQUEST_PRIORITY = Priority.newInstance(1)
-
-  def get: YarnSparkHadoopUtil = {
-    val yarnMode = java.lang.Boolean.parseBoolean(
-      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if (!yarnMode) {
-      throw new SparkException("YarnSparkHadoopUtil is not available in 
non-YARN mode!")
-    }
-    SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
-  }
-  /**
-   * Add a path variable to the given environment map.
-   * If the map already contains this key, append the value to the existing 
value instead.
-   */
-  def addPathToEnvironment(env: HashMap[String, String], key: String, value: 
String): Unit = {
-    val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator  
+ value } else value
-    env.put(key, newValue)
-  }
-
-  /**
-   * Set zero or more environment variables specified by the given input 
string.
-   * The input string is expected to take the form 
"KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
-   */
-  def setEnvFromInputString(env: HashMap[String, String], inputString: 
String): Unit = {
-    if (inputString != null && inputString.length() > 0) {
-      val childEnvs = inputString.split(",")
-      val p = Pattern.compile(environmentVariableRegex)
-      for (cEnv <- childEnvs) {
-        val parts = cEnv.split("=") // split on '='
-        val m = p.matcher(parts(1))
-        val sb = new StringBuffer
-        while (m.find()) {
-          val variable = m.group(1)
-          var replace = ""
-          if (env.get(variable) != None) {
-            replace = env.get(variable).get
-          } else {
-            // if this key is not configured for the child .. get it from the 
env
-            replace = System.getenv(variable)
-            if (replace == null) {
-            // the env key is note present anywhere .. simply set it
-              replace = ""
-            }
-          }
-          m.appendReplacement(sb, Matcher.quoteReplacement(replace))
-        }
-        m.appendTail(sb)
-        // This treats the environment variable as path variable delimited by 
`File.pathSeparator`
-        // This is kept for backward compatibility and consistency with 
Hadoop's behavior
-        addPathToEnvironment(env, parts(0), sb.toString)
-      }
-    }
-  }
-
-  private val environmentVariableRegex: String = {
-    if (Utils.isWindows) {
-      "%([A-Za-z_][A-Za-z0-9_]*?)%"
-    } else {
-      "\\$([A-Za-z_][A-Za-z0-9_]*)"
-    }
-  }
-
-  /**
-   * Kill if OOM is raised - leverage yarn's failure handling to cause 
rescheduling.
-   * Not killing the task leaves various aspects of the executor and (to some 
extent) the jvm in
-   * an inconsistent state.
-   * TODO: If the OOM is not recoverable by rescheduling it on different node, 
then do
-   * 'something' to fail job ... akin to blacklisting trackers in mapred ?
-   *
-   * The handler if an OOM Exception is thrown by the JVM must be configured 
on Windows
-   * differently: the 'taskkill' command should be used, whereas Unix-based 
systems use 'kill'.
-   *
-   * As the JVM interprets both %p and %%p as the same, we can use either of 
them. However,
-   * some tests on Windows computers suggest, that the JVM only accepts '%%p'.
-   *
-   * Furthermore, the behavior of the character '%' on the Windows command 
line differs from
-   * the behavior of '%' in a .cmd file: it gets interpreted as an incomplete 
environment
-   * variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way 
of writing
-   * '%%p' in an escaped way is '%%%%p'.
-   */
-  private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): 
Unit = {
-    if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) {
-      if (Utils.isWindows) {
-        javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID 
%%%%p")
-      } else {
-        javaOpts += "-XX:OnOutOfMemoryError='kill %p'"
-      }
-    }
-  }
-
-  /**
-   * Escapes a string for inclusion in a command line executed by Yarn. Yarn 
executes commands
-   * using either
-   *
-   * (Unix-based) `bash -c "command arg1 arg2"` and that means plain quoting 
doesn't really work.
-   * The argument is enclosed in single quotes and some key characters are 
escaped.
-   *
-   * (Windows-based) part of a .cmd file in which case windows escaping for 
each argument must be
-   * applied. Windows is quite lenient, however it is usually Java that causes 
trouble, needing to
-   * distinguish between arguments starting with '-' and class names. If 
arguments are surrounded
-   * by ' java takes the following string as is, hence an argument is 
mistakenly taken as a class
-   * name which happens to start with a '-'. The way to avoid this, is to 
surround nothing with
-   * a ', but instead with a ".
-   *
-   * @param arg A single argument.
-   * @return Argument quoted for execution via Yarn's generated shell script.
-   */
-  def escapeForShell(arg: String): String = {
-    if (arg != null) {
-      if (Utils.isWindows) {
-        YarnCommandBuilderUtils.quoteForBatchScript(arg)
-      } else {
-        val escaped = new StringBuilder("'")
-        for (i <- 0 to arg.length() - 1) {
-          arg.charAt(i) match {
-            case '$' => escaped.append("\\$")
-            case '"' => escaped.append("\\\"")
-            case '\'' => escaped.append("'\\''")
-            case c => escaped.append(c)
-          }
-        }
-        escaped.append("'").toString()
-      }
-    } else {
-      arg
-    }
-  }
-
-  // YARN/Hadoop acls are specified as user1,user2 group1,group2
-  // Users and groups are separated by a space and hence we need to pass the 
acls in same format
-  def getApplicationAclsForYarn(securityMgr: SecurityManager)
-      : Map[ApplicationAccessType, String] = {
-    Map[ApplicationAccessType, String] (
-      ApplicationAccessType.VIEW_APP -> (securityMgr.getViewAcls + " " +
-        securityMgr.getViewAclsGroups),
-      ApplicationAccessType.MODIFY_APP -> (securityMgr.getModifyAcls + " " +
-        securityMgr.getModifyAclsGroups)
-    )
-  }
-
-  /**
-   * Expand environment variable using Yarn API.
-   * If environment.$$() is implemented, return the result of it.
-   * Otherwise, return the result of environment.$()
-   * Note: $$() is added in Hadoop 2.4.
-   */
-  private lazy val expandMethod =
-    Try(classOf[Environment].getMethod("$$"))
-      .getOrElse(classOf[Environment].getMethod("$"))
-
-  def expandEnvironment(environment: Environment): String =
-    expandMethod.invoke(environment).asInstanceOf[String]
-
-  /**
-   * Get class path separator using Yarn API.
-   * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
-   * Otherwise, return File.pathSeparator
-   * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
-   */
-  private lazy val classPathSeparatorField =
-    Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
-      .getOrElse(classOf[File].getField("pathSeparator"))
-
-  def getClassPathSeparator(): String = {
-    classPathSeparatorField.get(null).asInstanceOf[String]
-  }
-
-  /**
-   * Getting the initial target number of executors depends on whether dynamic 
allocation is
-   * enabled.
-   * If not using dynamic allocation it gets the number of executors requested 
by the user.
-   */
-  def getInitialTargetExecutorNumber(
-      conf: SparkConf,
-      numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
-    if (Utils.isDynamicAllocationEnabled(conf)) {
-      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
-      val initialNumExecutors = 
Utils.getDynamicAllocationInitialExecutors(conf)
-      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
-      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= 
maxNumExecutors,
-        s"initial executor number $initialNumExecutors must between min 
executor number " +
-          s"$minNumExecutors and max executor number $maxNumExecutors")
-
-      initialNumExecutors
-    } else {
-      val targetNumExecutors =
-        
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
-      // System property can override environment variable.
-      conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
deleted file mode 100644
index 666cb45..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.spark.internal.config.ConfigBuilder
-import org.apache.spark.network.util.ByteUnit
-
-package object config {
-
-  /* Common app configuration. */
-
-  private[spark] val APPLICATION_TAGS = ConfigBuilder("spark.yarn.tags")
-    .doc("Comma-separated list of strings to pass through as YARN application 
tags appearing " +
-      "in YARN Application Reports, which can be used for filtering when 
querying YARN.")
-    .stringConf
-    .toSequence
-    .createOptional
-
-  private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
-    ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
-      .doc("Interval after which AM failures will be considered independent 
and " +
-        "not accumulate towards the attempt count.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createOptional
-
-  private[spark] val AM_PORT =
-    ConfigBuilder("spark.yarn.am.port")
-      .intConf
-      .createWithDefault(0)
-
-  private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
-    ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
-      .doc("Interval after which Executor failures will be considered 
independent and not " +
-        "accumulate towards the attempt count.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createOptional
-
-  private[spark] val MAX_APP_ATTEMPTS = 
ConfigBuilder("spark.yarn.maxAppAttempts")
-    .doc("Maximum number of AM attempts before failing the app.")
-    .intConf
-    .createOptional
-
-  private[spark] val USER_CLASS_PATH_FIRST = 
ConfigBuilder("spark.yarn.user.classpath.first")
-    .doc("Whether to place user jars in front of Spark's classpath.")
-    .booleanConf
-    .createWithDefault(false)
-
-  private[spark] val GATEWAY_ROOT_PATH = 
ConfigBuilder("spark.yarn.config.gatewayPath")
-    .doc("Root of configuration paths that is present on gateway nodes, and 
will be replaced " +
-      "with the corresponding path in cluster machines.")
-    .stringConf
-    .createWithDefault(null)
-
-  private[spark] val REPLACEMENT_ROOT_PATH = 
ConfigBuilder("spark.yarn.config.replacementPath")
-    .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when 
launching processes " +
-      "in the YARN cluster.")
-    .stringConf
-    .createWithDefault(null)
-
-  private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
-    .stringConf
-    .createWithDefault("default")
-
-  private[spark] val HISTORY_SERVER_ADDRESS = 
ConfigBuilder("spark.yarn.historyServer.address")
-    .stringConf
-    .createOptional
-
-  /* File distribution. */
-
-  private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
-    .doc("Location of archive containing jars files with Spark classes.")
-    .stringConf
-    .createOptional
-
-  private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
-    .doc("Location of jars containing Spark classes.")
-    .stringConf
-    .toSequence
-    .createOptional
-
-  private[spark] val ARCHIVES_TO_DISTRIBUTE = 
ConfigBuilder("spark.yarn.dist.archives")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val FILES_TO_DISTRIBUTE = 
ConfigBuilder("spark.yarn.dist.files")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val PRESERVE_STAGING_FILES = 
ConfigBuilder("spark.yarn.preserve.staging.files")
-    .doc("Whether to preserve temporary files created by the job in HDFS.")
-    .booleanConf
-    .createWithDefault(false)
-
-  private[spark] val STAGING_FILE_REPLICATION = 
ConfigBuilder("spark.yarn.submit.file.replication")
-    .doc("Replication factor for files uploaded by Spark to HDFS.")
-    .intConf
-    .createOptional
-
-  private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
-    .doc("Staging directory used while submitting applications.")
-    .stringConf
-    .createOptional
-
-  /* Cluster-mode launcher configuration. */
-
-  private[spark] val WAIT_FOR_APP_COMPLETION = 
ConfigBuilder("spark.yarn.submit.waitAppCompletion")
-    .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
-      "launcher process.")
-    .booleanConf
-    .createWithDefault(true)
-
-  private[spark] val REPORT_INTERVAL = 
ConfigBuilder("spark.yarn.report.interval")
-    .doc("Interval between reports of the current app status in cluster mode.")
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefaultString("1s")
-
-  /* Shared Client-mode AM / Driver configuration. */
-
-  private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefaultString("100s")
-
-  private[spark] val AM_NODE_LABEL_EXPRESSION = 
ConfigBuilder("spark.yarn.am.nodeLabelExpression")
-    .doc("Node label expression for the AM.")
-    .stringConf
-    .createOptional
-
-  private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
-    ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
-      .intConf
-      .createWithDefault(25)
-
-  private[spark] val MAX_EXECUTOR_FAILURES = 
ConfigBuilder("spark.yarn.max.executor.failures")
-    .intConf
-    .createOptional
-
-  private[spark] val MAX_REPORTER_THREAD_FAILURES =
-    ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
-      .intConf
-      .createWithDefault(5)
-
-  private[spark] val RM_HEARTBEAT_INTERVAL =
-    ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("3s")
-
-  private[spark] val INITIAL_HEARTBEAT_INTERVAL =
-    ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("200ms")
-
-  private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
-    .doc("A comma-separated list of class names of services to add to the 
scheduler.")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  /* Client-mode AM configuration. */
-
-  private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
-    .intConf
-    .createWithDefault(1)
-
-  private[spark] val AM_JAVA_OPTIONS = 
ConfigBuilder("spark.yarn.am.extraJavaOptions")
-    .doc("Extra Java options for the client-mode AM.")
-    .stringConf
-    .createOptional
-
-  private[spark] val AM_LIBRARY_PATH = 
ConfigBuilder("spark.yarn.am.extraLibraryPath")
-    .doc("Extra native library path for the client-mode AM.")
-    .stringConf
-    .createOptional
-
-  private[spark] val AM_MEMORY_OVERHEAD = 
ConfigBuilder("spark.yarn.am.memoryOverhead")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
-  private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
-    .bytesConf(ByteUnit.MiB)
-    .createWithDefaultString("512m")
-
-  /* Driver configuration. */
-
-  private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
-    .intConf
-    .createWithDefault(1)
-
-  private[spark] val DRIVER_MEMORY_OVERHEAD = 
ConfigBuilder("spark.yarn.driver.memoryOverhead")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
-  /* Executor configuration. */
-
-  private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
-    .intConf
-    .createWithDefault(1)
-
-  private[spark] val EXECUTOR_MEMORY_OVERHEAD = 
ConfigBuilder("spark.yarn.executor.memoryOverhead")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
-  private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
-    ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
-      .doc("Node label expression for executors.")
-      .stringConf
-      .createOptional
-
-  /* Security configuration. */
-
-  private[spark] val CREDENTIAL_FILE_MAX_COUNT =
-    ConfigBuilder("spark.yarn.credentials.file.retention.count")
-      .intConf
-      .createWithDefault(5)
-
-  private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
-    ConfigBuilder("spark.yarn.credentials.file.retention.days")
-      .intConf
-      .createWithDefault(5)
-
-  private[spark] val NAMENODES_TO_ACCESS = 
ConfigBuilder("spark.yarn.access.namenodes")
-    .doc("Extra NameNode URLs for which to request delegation tokens. The 
NameNode that hosts " +
-      "fs.defaultFS does not need to be listed here.")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  /* Rolled log aggregation configuration. */
-
-  private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
-    ConfigBuilder("spark.yarn.rolledLog.includePattern")
-      .doc("Java Regex to filter the log files which match the defined include 
pattern and those " +
-        "log files will be aggregated in a rolling fashion.")
-      .stringConf
-      .createOptional
-
-  private[spark] val ROLLED_LOG_EXCLUDE_PATTERN =
-    ConfigBuilder("spark.yarn.rolledLog.excludePattern")
-      .doc("Java Regex to filter the log files which match the defined exclude 
pattern and those " +
-        "log files will not be aggregated in a rolling fashion.")
-      .stringConf
-      .createOptional
-
-  /* Private configs. */
-
-  private[spark] val CREDENTIALS_FILE_PATH = 
ConfigBuilder("spark.yarn.credentials.file")
-    .internal()
-    .stringConf
-    .createWithDefault(null)
-
-  // Internal config to propagate the location of the user's jar to the 
driver/executors
-  private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
-    .internal()
-    .stringConf
-    .createOptional
-
-  // Internal config to propagate the locations of any extra jars to add to 
the classpath
-  // of the executors
-  private[spark] val SECONDARY_JARS = 
ConfigBuilder("spark.yarn.secondary.jars")
-    .internal()
-    .stringConf
-    .toSequence
-    .createOptional
-
-  /* Configuration and cached file propagation. */
-
-  private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames")
-    .internal()
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val CACHED_FILES_SIZES = 
ConfigBuilder("spark.yarn.cache.sizes")
-    .internal()
-    .longConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val CACHED_FILES_TIMESTAMPS = 
ConfigBuilder("spark.yarn.cache.timestamps")
-    .internal()
-    .longConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  private[spark] val CACHED_FILES_VISIBILITIES = 
ConfigBuilder("spark.yarn.cache.visibilities")
-    .internal()
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  // Either "file" or "archive", for each file.
-  private[spark] val CACHED_FILES_TYPES = 
ConfigBuilder("spark.yarn.cache.types")
-    .internal()
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
-  // The location of the conf archive in HDFS.
-  private[spark] val CACHED_CONF_ARCHIVE = 
ConfigBuilder("spark.yarn.cache.confArchive")
-    .internal()
-    .stringConf
-    .createOptional
-
-  private[spark] val CREDENTIALS_RENEWAL_TIME = 
ConfigBuilder("spark.yarn.credentials.renewalTime")
-    .internal()
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefault(Long.MaxValue)
-
-  private[spark] val CREDENTIALS_UPDATE_TIME = 
ConfigBuilder("spark.yarn.credentials.updateTime")
-    .internal()
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefault(Long.MaxValue)
-
-  // The list of cache-related config entries. This is used by Client and the 
AM to clean
-  // up the environment so that these settings do not appear on the web UI.
-  private[yarn] val CACHE_CONFIGS = Seq(
-    CACHED_FILES,
-    CACHED_FILES_SIZES,
-    CACHED_FILES_TIMESTAMPS,
-    CACHED_FILES_VISIBILITIES,
-    CACHED_FILES_TYPES,
-    CACHED_CONF_ARCHIVE)
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
deleted file mode 100644
index 7e76f40..0000000
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn.security
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.util.ThreadUtils
-
-/**
- * The following methods are primarily meant to make sure long-running apps 
like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for the 
container
- * has elapsed. It then obtains new credentials and writes them to HDFS in a
- * pre-specified location - the prefix of which is specified in the sparkConf 
by
- * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, 
c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing suffix), 
also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens renewal 
interval has elapsed.
- *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has elapsed. At 
that time the
- * executor finds the credentials file with the latest timestamp and checks if 
it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running UGI 
with it. This
- * process happens again once 80% of the validity of this has expired.
- */
-private[yarn] class AMCredentialRenewer(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    credentialManager: ConfigurableCredentialManager) extends Logging {
-
-  private var lastCredentialsFileSuffix = 0
-
-  private val credentialRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
-
-  private val hadoopUtil = YarnSparkHadoopUtil.get
-
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-    hadoopUtil.getConfBypassingFSCache(hadoopConf, new 
Path(credentialsFile).toUri.getScheme)
-
-  @volatile private var timeOfNextRenewal = 
sparkConf.get(CREDENTIALS_RENEWAL_TIME)
-
-  /**
-   * Schedule a login from the keytab and principal set using the --principal 
and --keytab
-   * arguments to spark-submit. This login happens only when the credentials 
of the current user
-   * are about to expire. This method reads spark.yarn.principal and 
spark.yarn.keytab from
-   * SparkConf to do the login. This method is a no-op in non-YARN mode.
-   *
-   */
-  private[spark] def scheduleLoginFromKeytab(): Unit = {
-    val principal = sparkConf.get(PRINCIPAL).get
-    val keytab = sparkConf.get(KEYTAB).get
-
-    /**
-     * Schedule re-login and creation of new credentials. If credentials have 
already expired, this
-     * method will synchronously create new ones.
-     */
-    def scheduleRenewal(runnable: Runnable): Unit = {
-      // Run now!
-      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
-      if (remainingTime <= 0) {
-        logInfo("Credentials have expired, creating new ones now.")
-        runnable.run()
-      } else {
-        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
-        credentialRenewer.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
-      }
-    }
-
-    // This thread periodically runs on the AM to update the credentials on 
HDFS.
-    val credentialRenewerRunnable =
-      new Runnable {
-        override def run(): Unit = {
-          try {
-            writeNewCredentialsToHDFS(principal, keytab)
-            cleanupOldFiles()
-          } catch {
-            case e: Exception =>
-              // Log the error and try to write new tokens back in an hour
-              logWarning("Failed to write out new credentials to HDFS, will 
try again in an " +
-                "hour! If this happens too often tasks will fail.", e)
-              credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
-              return
-          }
-          scheduleRenewal(this)
-        }
-      }
-    // Schedule update of credentials. This handles the case of updating the 
credentials right now
-    // as well, since the renewal interval will be 0, and the thread will get 
scheduled
-    // immediately.
-    scheduleRenewal(credentialRenewerRunnable)
-  }
-
-  // Keeps only files that are newer than daysToKeepFiles days, and deletes 
everything else. At
-  // least numFilesToKeep files are kept for safety
-  private def cleanupOldFiles(): Unit = {
-    import scala.concurrent.duration._
-    try {
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      val credentialsPath = new Path(credentialsFile)
-      val thresholdTime = System.currentTimeMillis() - 
(daysToKeepFiles.days).toMillis
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .dropRight(numFilesToKeep)
-        .takeWhile(_.getModificationTime < thresholdTime)
-        .foreach(x => remoteFs.delete(x.getPath, true))
-    } catch {
-      // Such errors are not fatal, so don't throw. Make sure they are logged 
though
-      case e: Exception =>
-        logWarning("Error while attempting to cleanup old credentials. If you 
are seeing many " +
-          "such warnings there may be an issue with your HDFS cluster.", e)
-    }
-  }
-
-  private def writeNewCredentialsToHDFS(principal: String, keytab: String): 
Unit = {
-    // Keytab is copied by YARN to the working directory of the AM, so full 
path is
-    // not needed.
-
-    // HACK:
-    // HDFS will not issue new delegation tokens, if the Credentials object
-    // passed in already has tokens for that FS even if the tokens are expired 
(it really only
-    // checks if there are tokens for the service, and not if they are valid). 
So the only real
-    // way to get new tokens is to make sure a different Credentials object is 
used each time to
-    // get new tokens and then the new tokens are copied over the current 
user's Credentials.
-    // So:
-    // - we login as a different user and get the UGI
-    // - use that UGI to get the tokens (see doAs block below)
-    // - copy the tokens over to the current user's credentials (this will 
overwrite the tokens
-    // in the current user's Credentials object for this FS).
-    // The login to KDC happens each time new tokens are required, but this is 
rare enough to not
-    // have to worry about (like once every day or so). This makes this code 
clearer than having
-    // to login and then relogin every time (the HDFS API may not relogin 
since we don't use this
-    // UGI directly for HDFS communication.
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    val keytabLoggedInUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-    logInfo("Successfully logged into KDC.")
-    val tempCreds = keytabLoggedInUGI.getCredentials
-    val credentialsPath = new Path(credentialsFile)
-    val dst = credentialsPath.getParent
-    var nearestNextRenewalTime = Long.MaxValue
-    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
-      // Get a copy of the credentials
-      override def run(): Void = {
-        nearestNextRenewalTime = 
credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
-        null
-      }
-    })
-
-    val currTime = System.currentTimeMillis()
-    val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) {
-      // If next renewal time is earlier than current time, we set next 
renewal time to current
-      // time, this will trigger next renewal immediately. Also set next 
update time to current
-      // time. There still has a gap between token renewal and update will 
potentially introduce
-      // issue.
-      logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is 
earlier than " +
-        s"current time ($currTime), which is unexpected, please check your 
credential renewal " +
-        "related configurations in the target services.")
-      timeOfNextRenewal = currTime
-      currTime
-    } else {
-      // Next valid renewal time is about 75% of credential renewal time, and 
update time is
-      // slightly later than valid renewal time (80% of renewal time).
-      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + 
currTime).toLong
-      ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
-    }
-
-    // Add the temp credentials back to the original ones.
-    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    val remoteFs = FileSystem.get(freshHadoopConf)
-    // If lastCredentialsFileSuffix is 0, then the AM is either started or 
restarted. If the AM
-    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find 
the newest file
-    // and update the lastCredentialsFileSuffix.
-    if (lastCredentialsFileSuffix == 0) {
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { status =>
-        lastCredentialsFileSuffix = 
hadoopUtil.getSuffixForCredentialsPath(status.getPath)
-      }
-    }
-    val nextSuffix = lastCredentialsFileSuffix + 1
-
-    val tokenPathStr =
-      credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
-        timeOfNextUpdate.toLong.toString + 
SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
-          nextSuffix
-    val tokenPath = new Path(tokenPathStr)
-    val tempTokenPath = new Path(tokenPathStr + 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-
-    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
-    logInfo(s"Delegation Tokens written out successfully. Renaming file to 
$tokenPathStr")
-    remoteFs.rename(tempTokenPath, tokenPath)
-    logInfo("Delegation token file rename complete.")
-    lastCredentialsFileSuffix = nextSuffix
-  }
-
-  def stop(): Unit = {
-    credentialRenewer.shutdown()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to