This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0e2ca11 [SPARK-29149][YARN] Update YARN cluster manager For Stage
Level Scheduling
0e2ca11 is described below
commit 0e2ca11d80c3921387d7b077cb64c3a0c06b08d7
Author: Thomas Graves <[email protected]>
AuthorDate: Fri Feb 28 15:23:33 2020 -0600
[SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling
### What changes were proposed in this pull request?
Yarn side changes for Stage level scheduling. The previous PR for dynamic
allocation changes was https://github.com/apache/spark/pull/27313
Modified the data structures to store things on a per ResourceProfile basis.
I tried to keep the code changes to a minimum, the main loop that requests
just goes through each Resourceprofile and the logic inside for each one stayed
very close to the same.
On submission we now have to give each ResourceProfile a separate yarn
Priority because yarn doesn't support asking for containers with different
resources at the same Priority. We just use the profile id as the priority
level.
Using a different Priority actually makes things easier when the containers
come back to match them again which ResourceProfile they were requested for.
The expectation is that yarn will only give you a container with resource
amounts you requested or more. It should never give you a container if it
doesn't satisfy your resource requests.
If you want to see the full feature changes you can look at
https://github.com/apache/spark/pull/27053/files for reference
### Why are the changes needed?
For stage level scheduling YARN support.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
Tested manually on YARN cluster and then unit tests.
Closes #27583 from tgravescs/SPARK-29149.
Authored-by: Thomas Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../cluster/CoarseGrainedClusterMessage.scala | 6 +-
.../org/apache/spark/HeartbeatReceiverSuite.scala | 30 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 9 +-
.../deploy/yarn/ApplicationMasterSource.scala | 4 +-
...calityPreferredContainerPlacementStrategy.scala | 25 +-
.../spark/deploy/yarn/ResourceRequestHelper.scala | 11 +
.../apache/spark/deploy/yarn/YarnAllocator.scala | 556 ++++++++++++++-------
.../scheduler/cluster/YarnSchedulerBackend.scala | 8 +-
.../yarn/ContainerPlacementStrategySuite.scala | 46 +-
.../yarn/LocalityPlacementStrategySuite.scala | 6 +-
.../spark/deploy/yarn/YarnAllocatorSuite.scala | 258 +++++++---
.../cluster/YarnSchedulerBackendSuite.scala | 20 +-
12 files changed, 663 insertions(+), 316 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 8db0122..465c0d2 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -117,9 +117,9 @@ private[spark] object CoarseGrainedClusterMessages {
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
- requestedTotal: Int,
- localityAwareTasks: Int,
- hostToLocalTaskCount: Map[String, Int],
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int],
+ numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+ hostToLocalTaskCount: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index a929695..3126913 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
-import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef,
RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -61,6 +61,7 @@ class HeartbeatReceiverSuite
PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen"))
private val _executorTimeoutMs =
PrivateMethod[Long](Symbol("executorTimeoutMs"))
private val _killExecutorThread =
PrivateMethod[ExecutorService](Symbol("killExecutorThread"))
+ var conf: SparkConf = _
/**
* Before each test, set up the SparkContext and a custom
[[HeartbeatReceiver]]
@@ -68,7 +69,7 @@ class HeartbeatReceiverSuite
*/
override def beforeEach(): Unit = {
super.beforeEach()
- val conf = new SparkConf()
+ conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set(DYN_ALLOCATION_TESTING, true)
@@ -76,7 +77,6 @@ class HeartbeatReceiverSuite
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
- when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@@ -164,9 +164,10 @@ class HeartbeatReceiverSuite
test("expire dead hosts should kill executors with replacement
(SPARK-8119)") {
// Set up a fake backend and cluster manager to simulate killing executors
val rpcEnv = sc.env.rpcEnv
- val fakeClusterManager = new FakeClusterManager(rpcEnv)
+ val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm",
fakeClusterManager)
- val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv,
fakeClusterManagerRef)
+ val fakeSchedulerBackend =
+ new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef,
sc.resourceProfileManager)
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
// Register fake executors with our fake scheduler backend
@@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv:
RpcEnv) extends RpcEndpo
private class FakeSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
- clusterManagerEndpoint: RpcEndpointRef)
+ clusterManagerEndpoint: RpcEndpointRef,
+ resourceProfileManager: ResourceProfileManager)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean]
= {
clusterManagerEndpoint.ask[Boolean](
- RequestExecutors(
-
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
-
numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
- rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
- Set.empty))
- }
+ RequestExecutors(resourceProfileToTotalExecs,
numLocalityAwareTasksPerResourceProfileId,
+ rpHostToLocalTaskCount, Set.empty))
+}
protected override def doKillExecutors(executorIds: Seq[String]):
Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
@@ -303,7 +302,7 @@ private class FakeSchedulerBackend(
/**
* Dummy cluster manager to simulate responses to executor allocation requests.
*/
-private class FakeClusterManager(override val rpcEnv: RpcEnv) extends
RpcEndpoint {
+private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf)
extends RpcEndpoint {
private var targetNumExecutors = 0
private val executorIdsToKill = new mutable.HashSet[String]
@@ -311,8 +310,9 @@ private class FakeClusterManager(override val rpcEnv:
RpcEnv) extends RpcEndpoin
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
- case RequestExecutors(requestedTotal, _, _, _) =>
- targetNumExecutors = requestedTotal
+ case RequestExecutors(resourceProfileToTotalExecs, _, _, _) =>
+ targetNumExecutors =
+
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf))
context.reply(true)
case KillExecutors(executorIds) =>
executorIdsToKill ++= executorIds
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1e8f408..43cd745 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -593,7 +593,7 @@ private[spark] class ApplicationMaster(
}
}
try {
- val numPendingAllocate = allocator.getPendingAllocate.size
+ val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
@@ -778,8 +778,11 @@ private[spark] class ApplicationMaster(
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
- if
(a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
- r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
+ if (a.requestTotalExecutorsWithPreferredLocalities(
+ r.resourceProfileToTotalExecs,
+ r.numLocalityAwareTasksPerResourceProfileId,
+ r.hostToLocalTaskCount,
+ r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
index 0fec916..62ac17c 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
@@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix:
String, yarnAllocator: Yarn
})
metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new
Gauge[Int] {
- override def getValue: Int = yarnAllocator.numLocalityAwareTasks
+ override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks
})
metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"),
new Gauge[Int] {
- override def getValue: Int = yarnAllocator.numContainersPendingAllocate
+ override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate
})
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 2288bb5..a6380ab 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId,
Resource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile
private[yarn] case class ContainerLocalityPreferences(nodes: Array[String],
racks: Array[String])
@@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes:
Array[String], rack
private[yarn] class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
- val resource: Resource,
resolver: SparkRackResolver) {
/**
@@ -96,6 +95,7 @@ private[yarn] class
LocalityPreferredContainerPlacementStrategy(
* containers
* @param localityMatchedPendingAllocations A sequence of pending container
request which
* matches the localities of
current required tasks.
+ * @param rp The ResourceProfile associated with this container.
* @return node localities and rack localities, each locality is an array of
string,
* the length of localities is the same as number of containers
*/
@@ -104,11 +104,12 @@ private[yarn] class
LocalityPreferredContainerPlacementStrategy(
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
- localityMatchedPendingAllocations: Seq[ContainerRequest]
+ localityMatchedPendingAllocations: Seq[ContainerRequest],
+ rp: ResourceProfile
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
numLocalityAwareTasks, hostToLocalTaskCount,
allocatedHostToContainersMap,
- localityMatchedPendingAllocations)
+ localityMatchedPendingAllocations, rp)
val updatedLocalityAwareContainerNum =
updatedHostToContainerCount.values.sum
// The number of containers to allocate, divided into two groups, one with
preferred locality,
@@ -152,11 +153,14 @@ private[yarn] class
LocalityPreferredContainerPlacementStrategy(
}
/**
- * Calculate the number of executors need to satisfy the given number of
pending tasks.
+ * Calculate the number of executors needed to satisfy the given number of
pending tasks for
+ * the ResourceProfile.
*/
- private def numExecutorsPending(numTasksPending: Int): Int = {
- val coresPerExecutor = resource.getVirtualCores
- (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) /
coresPerExecutor
+ private def numExecutorsPending(
+ numTasksPending: Int,
+ rp: ResourceProfile): Int = {
+ val tasksPerExec = rp.maxTasksPerExecutor(sparkConf)
+ math.ceil(numTasksPending / tasksPerExec.toDouble).toInt
}
/**
@@ -175,14 +179,15 @@ private[yarn] class
LocalityPreferredContainerPlacementStrategy(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
- localityMatchedPendingAllocations: Seq[ContainerRequest]
+ localityMatchedPendingAllocations: Seq[ContainerRequest],
+ rp: ResourceProfile
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
val pendingHostToContainersMap =
pendingHostToContainerCount(localityMatchedPendingAllocations)
hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
- count.toDouble * numExecutorsPending(localityAwareTasks) /
totalLocalTaskNum
+ count.toDouble * numExecutorsPending(localityAwareTasks, rp) /
totalLocalTaskNum
// Take the locality of pending containers into consideration
val existedCount =
allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
pendingHostToContainersMap.getOrElse(host, 0.0)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index ae316b0..3d800be 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging {
resourceInformation
}
+ def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
+ try {
+ // Use reflection as this uses APIs only available in Hadoop 3
+ val getResourcesMethod = resource.getClass().getMethod("getResources")
+ val resources =
getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
+ if (resources.nonEmpty) true else false
+ } catch {
+ case _: NoSuchMethodException => false
+ }
+ }
+
/**
* Checks whether Hadoop 2.x or 3 is used as a dependency.
* In case of Hadoop 3 and later, the ResourceInformation class
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 09414cb..cd0e7d5 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,9 +17,9 @@
package org.apache.spark.deploy.yarn
-import java.util.Collections
-import java.util.concurrent._
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -39,6 +39,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -75,19 +76,69 @@ private[yarn] class YarnAllocator(
import YarnAllocator._
// Visible for testing.
- val allocatedHostToContainersMap = new HashMap[String,
collection.mutable.Set[ContainerId]]
+ @GuardedBy("this")
+ val allocatedHostToContainersMapPerRPId =
+ new HashMap[Int, HashMap[String, collection.mutable.Set[ContainerId]]]
+
+ @GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
// Containers that we no longer care about. We've either already told the RM
to release them or
// will on the next heartbeat. Containers get removed from this map after
the RM tells us they've
// completed.
- private val releasedContainers = Collections.newSetFromMap[ContainerId](
- new ConcurrentHashMap[ContainerId, java.lang.Boolean])
+ @GuardedBy("this")
+ private val releasedContainers = collection.mutable.HashSet[ContainerId]()
+
+ @GuardedBy("this")
+ private val runningExecutorsPerResourceProfileId = new HashMap[Int,
mutable.Set[String]]()
- private val runningExecutors = Collections.newSetFromMap[String](
- new ConcurrentHashMap[String, java.lang.Boolean]())
+ @GuardedBy("this")
+ private val numExecutorsStartingPerResourceProfileId = new HashMap[Int,
AtomicInteger]
- private val numExecutorsStarting = new AtomicInteger(0)
+ @GuardedBy("this")
+ private val targetNumExecutorsPerResourceProfileId = new
mutable.HashMap[Int, Int]
+
+ // Executor loss reason requests that are pending - maps from executor ID
for inquiry to a
+ // list of requesters that should be responded to once we find out why the
given executor
+ // was lost.
+ @GuardedBy("this")
+ private val pendingLossReasonRequests = new HashMap[String,
mutable.Buffer[RpcCallContext]]
+
+ // Maintain loss reasons for already released executors, it will be added
when executor loss
+ // reason is got from AM-RM call, and be removed after querying this loss
reason.
+ @GuardedBy("this")
+ private val releasedExecutorLossReasons = new HashMap[String,
ExecutorLossReason]
+
+ // Keep track of which container is running which executor to remove the
executors later
+ // Visible for testing.
+ @GuardedBy("this")
+ private[yarn] val executorIdToContainer = new HashMap[String, Container]
+
+ @GuardedBy("this")
+ private var numUnexpectedContainerRelease = 0L
+
+ @GuardedBy("this")
+ private val containerIdToExecutorIdAndResourceProfileId = new
HashMap[ContainerId, (String, Int)]
+
+ // Use a ConcurrentHashMap because this is used in matchContainerToRequest,
which is called
+ // from the rack resolver thread where synchronize(this) on this would cause
a deadlock.
+ @GuardedBy("ConcurrentHashMap")
+ private[yarn] val rpIdToYarnResource = new ConcurrentHashMap[Int, Resource]()
+
+ // note currently we don't remove ResourceProfiles
+ @GuardedBy("this")
+ private[yarn] val rpIdToResourceProfile = new mutable.HashMap[Int,
ResourceProfile]
+
+ // A map of ResourceProfile id to a map of preferred hostname and possible
+ // task numbers running on it.
+ @GuardedBy("this")
+ private var hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String,
Int]] =
+ Map(DEFAULT_RESOURCE_PROFILE_ID -> Map.empty)
+
+ // ResourceProfile Id to number of tasks that have locality preferences in
active stages
+ @GuardedBy("this")
+ private[yarn] var numLocalityAwareTasksPerResourceProfileId: Map[Int, Int] =
+ Map(DEFAULT_RESOURCE_PROFILE_ID -> 0)
/**
* Used to generate a unique ID per executor
@@ -102,6 +153,7 @@ private[yarn] class YarnAllocator(
*
* @see SPARK-12864
*/
+ @GuardedBy("this")
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
@@ -110,26 +162,6 @@ private[yarn] class YarnAllocator(
private val allocatorBlacklistTracker =
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
- @volatile private var targetNumExecutors =
- SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
-
-
- // Executor loss reason requests that are pending - maps from executor ID
for inquiry to a
- // list of requesters that should be responded to once we find out why the
given executor
- // was lost.
- private val pendingLossReasonRequests = new HashMap[String,
mutable.Buffer[RpcCallContext]]
-
- // Maintain loss reasons for already released executors, it will be added
when executor loss
- // reason is got from AM-RM call, and be removed after querying this loss
reason.
- private val releasedExecutorLossReasons = new HashMap[String,
ExecutorLossReason]
-
- // Keep track of which container is running which executor to remove the
executors later
- // Visible for testing.
- private[yarn] val executorIdToContainer = new HashMap[String, Container]
-
- private var numUnexpectedContainerRelease = 0L
- private val containerIdToExecutorId = new HashMap[ContainerId, String]
-
// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
@@ -142,17 +174,18 @@ private[yarn] class YarnAllocator(
} else {
0
}
- // Number of cores per executor.
- protected val executorCores = sparkConf.get(EXECUTOR_CORES)
+ // Number of cores per executor for the default profile
+ protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES)
private val executorResourceRequests =
getYarnResourcesAndAmounts(sparkConf,
config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
- // Resource capability requested for each executor
- private[yarn] val resource: Resource = {
- val resource = Resource.newInstance(
- executorMemory + executorOffHeapMemory + memoryOverhead +
pysparkWorkerMemory, executorCores)
+ // Resource capability requested for each executor for the default profile
+ private[yarn] val defaultResource: Resource = {
+ val resource: Resource = Resource.newInstance(
+ executorMemory + executorOffHeapMemory + memoryOverhead +
pysparkWorkerMemory,
+ defaultExecutorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests,
resource)
logDebug(s"Created resource capability: $resource")
resource
@@ -166,19 +199,42 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
- // A map to store preferred hostname and possible task numbers running on it.
- private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
- // Number of tasks that have locality preferences in active stages
- private[yarn] var numLocalityAwareTasks: Int = 0
-
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
- new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource,
resolver)
+ new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
+
+ // The default profile is always present so we need to initialize the
datastructures keyed by
+ // ResourceProfile id to ensure its present if things start running before a
request for
+ // executors could add it. This approach is easier then going and special
casing everywhere.
+ private def initDefaultProfile(): Unit = synchronized {
+ allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
+ new HashMap[String, mutable.Set[ContainerId]]()
+ runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID,
mutable.HashSet[String]())
+ numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
new AtomicInteger(0)
+ targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
+ SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+ rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource)
+ rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
+ ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+ }
+
+ initDefaultProfile()
- def getNumExecutorsRunning: Int = runningExecutors.size()
+ def getNumExecutorsRunning: Int = synchronized {
+ runningExecutorsPerResourceProfileId.values.map(_.size).sum
+ }
+
+ def getNumLocalityAwareTasks: Int = synchronized {
+ numLocalityAwareTasksPerResourceProfileId.values.sum
+ }
- def getNumReleasedContainers: Int = releasedContainers.size()
+ def getNumExecutorsStarting: Int = synchronized {
+ numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum
+ }
+
+ def getNumReleasedContainers: Int = synchronized {
+ releasedContainers.size
+ }
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
@@ -186,49 +242,147 @@ private[yarn] class YarnAllocator(
/**
* A sequence of pending container requests that have not yet been fulfilled.
+ * ResourceProfile id -> pendingAllocate container request
*/
- def getPendingAllocate: Seq[ContainerRequest] =
getPendingAtLocation(ANY_HOST)
+ def getPendingAllocate: Map[Int, Seq[ContainerRequest]] =
getPendingAtLocation(ANY_HOST)
- def numContainersPendingAllocate: Int = synchronized {
- getPendingAllocate.size
+ def getNumContainersPendingAllocate: Int = synchronized {
+ getPendingAllocate.values.flatten.size
+ }
+
+ // YARN priorities are such that lower number is higher priority.
+ // We need to allocate a different priority for each ResourceProfile because
YARN
+ // won't allow different container resource requirements within a Priority.
+ // We could allocate per Stage to make sure earlier stages get priority but
Spark
+ // always finishes a stage before starting a later one and if we have 2
running in parallel
+ // the priority doesn't matter.
+ // We are using the ResourceProfile id as the priority.
+ private def getContainerPriority(rpId: Int): Priority = {
+ Priority.newInstance(rpId)
+ }
+
+ // The ResourceProfile id is the priority
+ private def getResourceProfileIdFromPriority(priority: Priority): Int = {
+ priority.getPriority()
+ }
+
+ private def getOrUpdateAllocatedHostToContainersMapForRPId(
+ rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] =
synchronized {
+ allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId,
+ new HashMap[String, mutable.Set[ContainerId]]())
+ }
+
+ private def getOrUpdateRunningExecutorForRPId(rpId: Int):
mutable.Set[String] = synchronized {
+ runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
mutable.HashSet[String]())
+ }
+
+ private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger
= synchronized {
+ numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new
AtomicInteger(0))
+ }
+
+ private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int =
synchronized {
+ targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
+ SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf))
}
/**
- * A sequence of pending container requests at the given location that have
not yet been
- * fulfilled.
+ * A sequence of pending container requests at the given location for each
ResourceProfile id
+ * that have not yet been fulfilled.
*/
- private def getPendingAtLocation(location: String): Seq[ContainerRequest] =
- amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location,
resource).asScala
- .flatMap(_.asScala)
+ private def getPendingAtLocation(
+ location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
+ val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
+ rpIdToResourceProfile.keys.map { id =>
+ val profResource = rpIdToYarnResource.get(id)
+ val result = amClient.getMatchingRequests(getContainerPriority(id),
location, profResource)
+ .asScala.flatMap(_.asScala)
+ allContainerRequests(id) = result
+ }
+ allContainerRequests.toMap
+ }
+
+ // if a ResourceProfile hasn't been seen yet, create the corresponding YARN
Resource for it
+ private def createYarnResourceForResourceProfile(
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit =
synchronized {
+ resourceProfileToTotalExecs.foreach { case (rp, num) =>
+ if (!rpIdToYarnResource.contains(rp.id)) {
+ // Start with the application or default settings
+ var heapMem = executorMemory.toLong
+ // Note we currently don't support off heap memory in ResourceProfile
- SPARK-30794
+ var offHeapMem = executorOffHeapMemory.toLong
+ var overheadMem = memoryOverhead.toLong
+ var pysparkMem = pysparkWorkerMemory.toLong
+ var cores = defaultExecutorCores
+ val customResources = new mutable.HashMap[String, String]
+ // track the resource profile if not already there
+ getOrUpdateRunningExecutorForRPId(rp.id)
+ logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+ val execResources = rp.executorResources
+ execResources.foreach { case (r, execReq) =>
+ r match {
+ case ResourceProfile.MEMORY =>
+ heapMem = execReq.amount
+ case ResourceProfile.OVERHEAD_MEM =>
+ overheadMem = execReq.amount
+ case ResourceProfile.PYSPARK_MEM =>
+ pysparkMem = execReq.amount
+ case ResourceProfile.CORES =>
+ cores = execReq.amount.toInt
+ case "gpu" =>
+ customResources(YARN_GPU_RESOURCE_CONFIG) =
execReq.amount.toString
+ case "fpga" =>
+ customResources(YARN_FPGA_RESOURCE_CONFIG) =
execReq.amount.toString
+ case rName =>
+ customResources(rName) = execReq.amount.toString
+ }
+ }
+ val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
+ val resource = Resource.newInstance(totalMem, cores)
+ ResourceRequestHelper.setResourceRequests(customResources.toMap,
resource)
+ logDebug(s"Created resource capability: $resource")
+ rpIdToYarnResource.putIfAbsent(rp.id, resource)
+ rpIdToResourceProfile(rp.id) = rp
+ }
+ }
+ }
/**
* Request as many executors from the ResourceManager as needed to reach the
desired total. If
* the requested total is smaller than the current number of running
executors, no executors will
* be killed.
- * @param requestedTotal total number of containers requested
- * @param localityAwareTasks number of locality aware tasks to be used as
container placement hint
- * @param hostToLocalTaskCount a map of preferred hostname to possible task
counts to be used as
- * container placement hint.
+ * @param resourceProfileToTotalExecs total number of containers requested
for each
+ * ResourceProfile
+ * @param numLocalityAwareTasksPerResourceProfileId number of locality aware
tasks for each
+ * ResourceProfile id to be
used as container
+ * placement hint.
+ * @param hostToLocalTaskCount a map of preferred hostname to possible task
counts for each
+ * ResourceProfile id to be used as container
placement hint.
* @param nodeBlacklist blacklisted nodes, which is passed in to avoid
allocating new containers
* on them. It will be used to update the application
master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
- requestedTotal: Int,
- localityAwareTasks: Int,
- hostToLocalTaskCount: Map[String, Int],
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int],
+ numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+ hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String]): Boolean = synchronized {
- this.numLocalityAwareTasks = localityAwareTasks
- this.hostToLocalTaskCounts = hostToLocalTaskCount
-
- if (requestedTotal != targetNumExecutors) {
- logInfo(s"Driver requested a total number of $requestedTotal
executor(s).")
- targetNumExecutors = requestedTotal
- allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
- true
- } else {
- false
+ this.numLocalityAwareTasksPerResourceProfileId =
numLocalityAwareTasksPerResourceProfileId
+ this.hostToLocalTaskCountPerResourceProfileId =
hostToLocalTaskCountPerResourceProfileId
+
+ createYarnResourceForResourceProfile(resourceProfileToTotalExecs)
+
+ val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
+ if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
+ logInfo(s"Driver requested a total number of $numExecs executor(s) " +
+ s"for resource profile id: ${rp.id}.")
+ targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
+ allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
+ true
+ } else {
+ false
+ }
}
+ res.exists(_ == true)
}
/**
@@ -237,8 +391,9 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
+ val (_, rpId) =
containerIdToExecutorIdAndResourceProfileId(container.getId)
internalReleaseContainer(container)
- runningExecutors.remove(executorId)
+ getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}
@@ -267,8 +422,8 @@ private[yarn] class YarnAllocator(
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
- runningExecutors.size,
- numExecutorsStarting.get,
+ getNumExecutorsRunning,
+ getNumExecutorsStarting,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
@@ -279,108 +434,122 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running
executor count: %d."
- .format(completedContainers.size, runningExecutors.size))
+ .format(completedContainers.size, getNumExecutorsRunning))
}
}
/**
* Update the set of container requests that we will sync with the RM based
on the number of
- * executors we have currently running and our target number of executors.
+ * executors we have currently running and our target number of executors
for each
+ * ResourceProfile.
*
* Visible for testing.
*/
- def updateResourceRequests(): Unit = {
- val pendingAllocate = getPendingAllocate
- val numPendingAllocate = pendingAllocate.size
- val missing = targetNumExecutors - numPendingAllocate -
- numExecutorsStarting.get - runningExecutors.size
- logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
- s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
- s"executorsStarting: ${numExecutorsStarting.get}")
-
- // Split the pending container request into three groups: locality matched
list, locality
- // unmatched list and non-locality list. Take the locality matched
container request into
- // consideration of container placement, treat as allocated containers.
- // For locality unmatched and locality free container requests, cancel
these container
- // requests, since required locality preference has been changed,
recalculating using
- // container placement strategy.
- val (localRequests, staleRequests, anyHostRequests) =
splitPendingAllocationsByLocality(
- hostToLocalTaskCounts, pendingAllocate)
-
- if (missing > 0) {
- if (log.isInfoEnabled()) {
- var requestContainerMessage = s"Will request $missing executor
container(s), each with " +
+ def updateResourceRequests(): Unit = synchronized {
+ val pendingAllocatePerResourceProfileId = getPendingAllocate
+ val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case
(rpId, targetNum) =>
+ val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
+ val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId,
Seq.empty).size
+ val running = getOrUpdateRunningExecutorForRPId(rpId).size
+ logDebug(s"Updating resource requests for ResourceProfile id: $rpId,
target: " +
+ s"$targetNum, pending: $pending, running: $running, executorsStarting:
$starting")
+ (rpId, targetNum - pending - running - starting)
+ }.toMap
+
+ missingPerProfile.foreach { case (rpId, missing) =>
+ val hostToLocalTaskCount =
+ hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty)
+ val pendingAllocate =
pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty)
+ val numPendingAllocate = pendingAllocate.size
+ // Split the pending container request into three groups: locality
matched list, locality
+ // unmatched list and non-locality list. Take the locality matched
container request into
+ // consideration of container placement, treat as allocated containers.
+ // For locality unmatched and locality free container requests, cancel
these container
+ // requests, since required locality preference has been changed,
recalculating using
+ // container placement strategy.
+ val (localRequests, staleRequests, anyHostRequests) =
splitPendingAllocationsByLocality(
+ hostToLocalTaskCount, pendingAllocate)
+
+ if (missing > 0) {
+ val resource = rpIdToYarnResource.get(rpId)
+ if (log.isInfoEnabled()) {
+ var requestContainerMessage = s"Will request $missing executor
container(s) for " +
+ s" ResourceProfile Id: $rpId, each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of
overhead)"
- if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
- executorResourceRequests.nonEmpty) {
- requestContainerMessage ++= s" with custom resources: " +
resource.toString
+ if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
+ ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
+ requestContainerMessage ++= s" with custom resources: " +
resource.toString
+ }
+ logInfo(requestContainerMessage)
}
- logInfo(requestContainerMessage)
- }
-
- // cancel "stale" requests for locations that are no longer needed
- staleRequests.foreach { stale =>
- amClient.removeContainerRequest(stale)
- }
- val cancelledContainers = staleRequests.size
- if (cancelledContainers > 0) {
- logInfo(s"Canceled $cancelledContainers container request(s) (locality
no longer needed)")
- }
- // consider the number of new containers and cancelled stale containers
available
- val availableContainers = missing + cancelledContainers
+ // cancel "stale" requests for locations that are no longer needed
+ staleRequests.foreach { stale =>
+ amClient.removeContainerRequest(stale)
+ }
+ val cancelledContainers = staleRequests.size
+ if (cancelledContainers > 0) {
+ logInfo(s"Canceled $cancelledContainers container request(s)
(locality no longer needed)")
+ }
- // to maximize locality, include requests with no locality preference
that can be cancelled
- val potentialContainers = availableContainers + anyHostRequests.size
+ // consider the number of new containers and cancelled stale
containers available
+ val availableContainers = missing + cancelledContainers
- val containerLocalityPreferences =
containerPlacementStrategy.localityOfRequestedContainers(
- potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
- allocatedHostToContainersMap, localRequests)
+ // to maximize locality, include requests with no locality preference
that can be cancelled
+ val potentialContainers = availableContainers + anyHostRequests.size
- val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
- containerLocalityPreferences.foreach {
- case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
- newLocalityRequests += createContainerRequest(resource, nodes, racks)
- case _ =>
- }
+ val allocatedHostToContainer =
getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
+ val numLocalityAwareTasks =
numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
+ val containerLocalityPreferences =
containerPlacementStrategy.localityOfRequestedContainers(
+ potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount,
+ allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId))
- if (availableContainers >= newLocalityRequests.size) {
- // more containers are available than needed for locality, fill in
requests for any host
- for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
- newLocalityRequests += createContainerRequest(resource, null, null)
- }
- } else {
- val numToCancel = newLocalityRequests.size - availableContainers
- // cancel some requests without locality preferences to schedule more
local containers
- anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
- amClient.removeContainerRequest(nonLocal)
- }
- if (numToCancel > 0) {
- logInfo(s"Canceled $numToCancel unlocalized container requests to
resubmit with locality")
+ val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+ containerLocalityPreferences.foreach {
+ case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+ newLocalityRequests += createContainerRequest(resource, nodes,
racks, rpId)
+ case _ =>
}
- }
- newLocalityRequests.foreach { request =>
- amClient.addContainerRequest(request)
- }
+ if (availableContainers >= newLocalityRequests.size) {
+ // more containers are available than needed for locality, fill in
requests for any host
+ for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+ newLocalityRequests += createContainerRequest(resource, null,
null, rpId)
+ }
+ } else {
+ val numToCancel = newLocalityRequests.size - availableContainers
+ // cancel some requests without locality preferences to schedule
more local containers
+ anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+ amClient.removeContainerRequest(nonLocal)
+ }
+ if (numToCancel > 0) {
+ logInfo(s"Canceled $numToCancel unlocalized container requests to
" +
+ s"resubmit with locality")
+ }
+ }
- if (log.isInfoEnabled()) {
- val (localized, anyHost) = newLocalityRequests.partition(_.getNodes()
!= null)
- if (anyHost.nonEmpty) {
- logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+ newLocalityRequests.foreach { request =>
+ amClient.addContainerRequest(request)
}
- localized.foreach { request =>
- logInfo(s"Submitted container request for host ${hostStr(request)}.")
+
+ if (log.isInfoEnabled()) {
+ val (localized, anyHost) =
newLocalityRequests.partition(_.getNodes() != null)
+ if (anyHost.nonEmpty) {
+ logInfo(s"Submitted ${anyHost.size} unlocalized container
requests.")
+ }
+ localized.foreach { request =>
+ logInfo(s"Submitted container request for host
${hostStr(request)}.")
+ }
}
+ } else if (numPendingAllocate > 0 && missing < 0) {
+ val numToCancel = math.min(numPendingAllocate, -missing)
+ logInfo(s"Canceling requests for $numToCancel executor container(s) to
have a new " +
+ s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)}
executors.")
+ // cancel pending allocate requests by taking locality preference into
account
+ val cancelRequests = (staleRequests ++ anyHostRequests ++
localRequests).take(numToCancel)
+ cancelRequests.foreach(amClient.removeContainerRequest)
}
- } else if (numPendingAllocate > 0 && missing < 0) {
- val numToCancel = math.min(numPendingAllocate, -missing)
- logInfo(s"Canceling requests for $numToCancel executor container(s) to
have a new desired " +
- s"total $targetNumExecutors executors.")
- // cancel pending allocate requests by taking locality preference into
account
- val cancelRequests = (staleRequests ++ anyHostRequests ++
localRequests).take(numToCancel)
- cancelRequests.foreach(amClient.removeContainerRequest)
}
}
@@ -405,8 +574,10 @@ private[yarn] class YarnAllocator(
private def createContainerRequest(
resource: Resource,
nodes: Array[String],
- racks: Array[String]): ContainerRequest = {
- new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true,
labelExpression.orNull)
+ racks: Array[String],
+ rpId: Int): ContainerRequest = {
+ new ContainerRequest(resource, nodes, racks, getContainerPriority(rpId),
+ true, labelExpression.orNull)
}
/**
@@ -499,20 +670,17 @@ private[yarn] class YarnAllocator(
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
- // SPARK-6050: certain Yarn configurations return a virtual core count
that doesn't match the
- // request; for example, capacity scheduler + DefaultResourceCalculator.
So match on requested
- // memory, but use the asked vcore count for matching, effectively
disabling matching on vcore
- // count.
- val matchingResource =
Resource.newInstance(allocatedContainer.getResource.getMemory,
- resource.getVirtualCores)
-
- ResourceRequestHelper.setResourceRequests(executorResourceRequests,
matchingResource)
+ // Match on the exact resource we requested so there shouldn't be a
mismatch,
+ // we are relying on YARN to return a container with resources no less
then we requested.
+ // If we change this, or starting validating the container, be sure the
logic covers SPARK-6050.
+ val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
+ val resourceForRP = rpIdToYarnResource.get(rpId)
logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
s"priority: ${allocatedContainer.getPriority}, " +
- s"location: $location, resource: $matchingResource")
+ s"location: $location, resource: $resourceForRP")
val matchingRequests =
amClient.getMatchingRequests(allocatedContainer.getPriority, location,
- matchingResource)
+ resourceForRP)
// Match the allocation to a request
if (!matchingRequests.isEmpty) {
@@ -528,30 +696,38 @@ private[yarn] class YarnAllocator(
/**
* Launches executors in the allocated containers.
*/
- private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]):
Unit = {
+ private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]):
Unit = synchronized {
for (container <- containersToUse) {
+ val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
- assert(container.getResource.getMemory >= resource.getMemory)
+ val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
+ assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
- s"for executor with ID $executorId")
+ s"for executor with ID $executorId for ResourceProfile Id $rpId")
def updateInternalState(): Unit = synchronized {
- runningExecutors.add(executorId)
- numExecutorsStarting.decrementAndGet()
+ getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
+ getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
executorIdToContainer(executorId) = container
- containerIdToExecutorId(container.getId) = executorId
+ containerIdToExecutorIdAndResourceProfileId(container.getId) =
(executorId, rpId)
- val containerSet =
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ val localallocatedHostToContainersMap =
getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
+ val containerSet =
localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
}
- if (runningExecutors.size() < targetNumExecutors) {
- numExecutorsStarting.incrementAndGet()
+ val rp = rpIdToResourceProfile(rpId)
+ val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
+ map(_.amount.toInt).getOrElse(executorMemory)
+ val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores)
+ val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
+ if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
+ getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
if (launchContainers) {
launcherPool.execute(() => {
try {
@@ -562,17 +738,17 @@ private[yarn] class YarnAllocator(
driverUrl,
executorId,
executorHostname,
- executorMemory,
- executorCores,
+ containerMem,
+ containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully
supported
+ rp.id
).run()
updateInternalState()
} catch {
case e: Throwable =>
- numExecutorsStarting.decrementAndGet()
+ getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on
container $containerId", e)
// Assigned container should be released immediately
@@ -589,24 +765,28 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as running executors count:
%d " +
- "reached target executors count: %d.").format(
- runningExecutors.size, targetNumExecutors))
+ "reached target executors count: %d.").format(rpRunningExecs,
+ getOrUpdateTargetNumExecutorsForRPId(rpId)))
}
}
}
// Visible for testing.
- private[yarn] def processCompletedContainers(completedContainers:
Seq[ContainerStatus]): Unit = {
+ private[yarn] def processCompletedContainers(
+ completedContainers: Seq[ContainerStatus]): Unit = synchronized {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
+ val (_, rpId) =
containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
+ ("", DEFAULT_RESOURCE_PROFILE_ID))
val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of
allocating.
- containerIdToExecutorId.get(containerId) match {
- case Some(executorId) => runningExecutors.remove(executorId)
+ containerIdToExecutorIdAndResourceProfileId.get(containerId) match {
+ case Some((executorId, _)) =>
+ getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case None => logWarning(s"Cannot find executorId for container:
${containerId.toString}")
}
@@ -679,19 +859,19 @@ private[yarn] class YarnAllocator(
for {
host <- hostOpt
- containerSet <- allocatedHostToContainersMap.get(host)
+ containerSet <-
getOrUpdateAllocatedHostToContainersMapForRPId(rpId).get(host)
} {
containerSet.remove(containerId)
if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
+ getOrUpdateAllocatedHostToContainersMapForRPId(rpId).remove(host)
} else {
- allocatedHostToContainersMap.update(host, containerSet)
+ getOrUpdateAllocatedHostToContainersMapForRPId(rpId).update(host,
containerSet)
}
allocatedContainerToHostMap.remove(containerId)
}
- containerIdToExecutorId.remove(containerId).foreach { eid =>
+ containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach
{ case (eid, _) =>
executorIdToContainer.remove(eid)
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
@@ -737,12 +917,14 @@ private[yarn] class YarnAllocator(
}
}
- private def internalReleaseContainer(container: Container): Unit = {
+ private def internalReleaseContainer(container: Container): Unit =
synchronized {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
- private[yarn] def getNumUnexpectedContainerRelease =
numUnexpectedContainerRelease
+ private[yarn] def getNumUnexpectedContainerRelease: Long = synchronized {
+ numUnexpectedContainerRelease
+ }
private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
pendingLossReasonRequests.size
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index f8bbc39..e428bab 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -130,12 +130,8 @@ private[spark] abstract class YarnSchedulerBackend(
val filteredRPHostToLocalTaskCount = rpHostToLocalTaskCount.map { case
(rpid, v) =>
(rpid, v.filter { case (host, count) => !nodeBlacklist.contains(host) })
}
- // TODO - default everything to default profile until YARN pieces
- val defaultProf = ResourceProfile.getOrCreateDefaultProfile(conf)
- val hostToLocalTaskCount =
filteredRPHostToLocalTaskCount.getOrElse(defaultProf.id, Map.empty)
- val localityAwareTasks =
numLocalityAwareTasksPerResourceProfileId.getOrElse(defaultProf.id, 0)
- val numExecutors = resourceProfileToTotalExecs.getOrElse(defaultProf, 0)
- RequestExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount,
nodeBlacklist)
+ RequestExecutors(resourceProfileToTotalExecs,
numLocalityAwareTasksPerResourceProfileId,
+ filteredRPHostToLocalTaskCount, nodeBlacklist)
}
/**
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index 29f1c05..d83a0d2 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.deploy.yarn
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
+import org.apache.spark.resource.ResourceProfile
class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with
BeforeAndAfterEach {
@@ -28,7 +31,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
import yarnAllocatorSuite._
def createContainerRequest(nodes: Array[String]): ContainerRequest =
- new ContainerRequest(containerResource, nodes, null,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+ new ContainerRequest(containerResource, nodes, null,
Priority.newInstance(1))
override def beforeEach(): Unit = {
yarnAllocatorSuite.beforeEach()
@@ -38,18 +41,22 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
yarnAllocatorSuite.afterEach()
}
+ val defaultResourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+
test("allocate locality preferred containers with enough resource and no
matched existed " +
"containers") {
// 1. All the locations of current containers cannot satisfy the new
requirements
// 2. Current requested container number can fully satisfy the pending
tasks.
- val handler = createAllocator(2)
+ val (handler, allocatorConf) = createAllocator(2)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(createContainer("host1"),
createContainer("host2")))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
- handler.allocatedHostToContainersMap, Seq.empty)
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
Seq.empty, rp)
assert(localities.map(_.nodes) === Array(
Array("host3", "host4", "host5"),
@@ -62,7 +69,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
// 1. Parts of current containers' locations can satisfy the new
requirements
// 2. Current requested container number can fully satisfy the pending
tasks.
- val handler = createAllocator(3)
+ val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@@ -70,9 +77,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
createContainer("host2")
))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
+
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
- handler.allocatedHostToContainersMap, Seq.empty)
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
Seq.empty, rp)
assert(localities.map(_.nodes) ===
Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -83,7 +93,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
// 1. Parts of current containers' locations can satisfy the new
requirements
// 2. Current requested container number cannot fully satisfy the pending
tasks.
- val handler = createAllocator(3)
+ val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@@ -91,9 +101,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
createContainer("host2")
))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
- handler.allocatedHostToContainersMap, Seq.empty)
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
Seq.empty, rp)
assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
}
@@ -101,7 +113,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite
with Matchers with B
test("allocate locality preferred containers with fully matched containers")
{
// Current containers' locations can fully satisfy the new requirements
- val handler = createAllocator(5)
+ val (handler, allocatorConf) = createAllocator(5)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@@ -111,9 +123,11 @@ class ContainerPlacementStrategySuite extends
SparkFunSuite with Matchers with B
createContainer("host3")
))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
- handler.allocatedHostToContainersMap, Seq.empty)
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
Seq.empty, rp)
assert(localities.map(_.nodes) === Array(null, null, null))
}
@@ -121,18 +135,21 @@ class ContainerPlacementStrategySuite extends
SparkFunSuite with Matchers with B
test("allocate containers with no locality preference") {
// Request new container without locality preference
- val handler = createAllocator(2)
+ val (handler, allocatorConf) = createAllocator(2)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(createContainer("host1"),
createContainer("host2")))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
+ 1, 0, Map.empty,
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
Seq.empty, rp)
assert(localities.map(_.nodes) === Array(null))
}
test("allocate locality preferred containers by considering the localities
of pending requests") {
- val handler = createAllocator(3)
+ val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@@ -144,9 +161,12 @@ class ContainerPlacementStrategySuite extends
SparkFunSuite with Matchers with B
createContainerRequest(Array("host2", "host3")),
createContainerRequest(Array("host1", "host4")))
+ ResourceProfile.clearDefaultProfile
+ val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities =
handler.containerPlacementStrategy.localityOfRequestedContainers(
1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
- handler.allocatedHostToContainersMap, pendingAllocationRequests)
+ handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
+ pendingAllocationRequests, rp)
assert(localities.map(_.nodes) === Array(Array("host3")))
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
index b7f2565..7278517 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.resource.ResourceProfile
class LocalityPlacementStrategySuite extends SparkFunSuite {
@@ -58,7 +59,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
val resource = Resource.newInstance(8 * 1024, 4)
val strategy = new LocalityPreferredContainerPlacementStrategy(new
SparkConf(),
- yarnConf, resource, new MockResolver())
+ yarnConf, new MockResolver())
val totalTasks = 32 * 1024
val totalContainers = totalTasks / 16
@@ -75,9 +76,10 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
containers.drop(count * i).take(i).foreach { c => hostContainers += c }
hostToContainerMap(host) = hostContainers
}
+ val rp = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
strategy.localityOfRequestedContainers(containers.size * 2, totalTasks,
hosts,
- hostToContainerMap, Nil)
+ hostToContainerMap, Nil, rp)
}
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6216d47..2003d0b 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
@@ -32,9 +33,9 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU}
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEndpointRef
@@ -69,6 +70,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers
with BeforeAndAfter
var containerNum = 0
+ // priority has to be 0 to match default profile id
+ val RM_REQUEST_PRIORITY = Priority.newInstance(0)
+ val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+ val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+
override def beforeEach(): Unit = {
super.beforeEach()
rmClient = AMRMClient.createAMRMClient()
@@ -93,7 +99,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers
with BeforeAndAfter
def createAllocator(
maxExecutors: Int = 5,
rmClient: AMRMClient[ContainerRequest] = rmClient,
- additionalConfigs: Map[String, String] = Map()): YarnAllocator = {
+ additionalConfigs: Map[String, String] = Map()): (YarnAllocator,
SparkConf) = {
val args = Array(
"--jar", "somejar.jar",
"--class", "SomeClass")
@@ -107,7 +113,7 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
sparkConfClone.set(name, value)
}
- new YarnAllocator(
+ val allocator = new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
@@ -118,16 +124,18 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
Map(),
new MockResolver(),
clock)
+ (allocator, sparkConfClone)
}
def createContainer(
host: String,
containerNumber: Int = containerNum,
- resource: Resource = containerResource): Container = {
+ resource: Resource = containerResource,
+ priority: Priority = RM_REQUEST_PRIORITY): Container = {
val containerId: ContainerId = ContainerId.newContainerId(appAttemptId,
containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
- Container.newInstance(containerId, nodeId, "", resource,
RM_REQUEST_PRIORITY, null)
+ Container.newInstance(containerId, nodeId, "", resource, priority, null)
}
def createContainers(hosts: Seq[String], containerIds: Seq[Int]):
Seq[Container] = {
@@ -145,20 +153,108 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
test("single container allocated") {
// request a single container and receive it
- val handler = createAllocator(1)
+ val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (1)
+ handler.getNumContainersPendingAllocate should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container.getId)
+ val hostTocontainer =
handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+ hostTocontainer.get("host1").get should contain(container.getId)
+
+ val size = rmClient.getMatchingRequests(container.getPriority, "host1",
containerResource).size
+ size should be (0)
+ }
+
+ test("single container allocated with ResourceProfile") {
+ assume(isYarnResourceTypesAvailable())
+ val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG)
+ ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
+ // create default profile so we get a different id to test below
+ val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+ val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+ val taskReq = new TaskResourceRequests().resource("gpu", 1)
+ val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+ // request a single container and receive it
+ val (handler, _) = createAllocator(0)
+
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof
-> 1)
+ val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id
-> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
+
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumContainersPendingAllocate should be (1)
+
+ val container = createContainer("host1", priority =
Priority.newInstance(rprof.id))
+ handler.handleAllocatedContainers(Array(container))
+
+ handler.getNumExecutorsRunning should be (1)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
+ val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
+ hostTocontainer.get("host1").get should contain(container.getId)
val size = rmClient.getMatchingRequests(container.getPriority, "host1",
containerResource).size
size should be (0)
+
+ ResourceProfile.reInitDefaultProfile(sparkConf)
+ }
+
+ test("multiple containers allocated with ResourceProfiles") {
+ assume(isYarnResourceTypesAvailable())
+ val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG,
YARN_FPGA_RESOURCE_CONFIG)
+ ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
+ // create default profile so we get a different id to test below
+ val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+ val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+ val taskReq = new TaskResourceRequests().resource("gpu", 1)
+ val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+
+ val execReq2 = new
ExecutorResourceRequests().memory("8g").resource("fpga", 2)
+ val taskReq2 = new TaskResourceRequests().resource("fpga", 1)
+ val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests)
+
+
+ // request a single container and receive it
+ val (handler, _) = createAllocator(1)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof
-> 1, rprof2 -> 2)
+ val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id
-> 0, rprof2.id -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
+
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumContainersPendingAllocate should be (3)
+
+ val containerResourcerp2 = Resource.newInstance(10240, 5)
+
+ val container = createContainer("host1", priority =
Priority.newInstance(rprof.id))
+ val container2 = createContainer("host2", resource = containerResourcerp2,
+ priority = Priority.newInstance(rprof2.id))
+ val container3 = createContainer("host3", resource = containerResourcerp2,
+ priority = Priority.newInstance(rprof2.id))
+ handler.handleAllocatedContainers(Array(container, container2, container3))
+
+ handler.getNumExecutorsRunning should be (3)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
+ handler.allocatedContainerToHostMap.get(container2.getId).get should be
("host2")
+ handler.allocatedContainerToHostMap.get(container3.getId).get should be
("host3")
+
+ val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
+ hostTocontainer.get("host1").get should contain(container.getId)
+ val hostTocontainer2 =
handler.allocatedHostToContainersMapPerRPId(rprof2.id)
+ hostTocontainer2.get("host2").get should contain(container2.getId)
+ hostTocontainer2.get("host3").get should contain(container3.getId)
+
+ val size = rmClient.getMatchingRequests(container.getPriority, "host1",
containerResource).size
+ size should be (0)
+
+ ResourceProfile.reInitDefaultProfile(sparkConf)
}
test("custom resource requested from yarn") {
@@ -166,16 +262,16 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
- val handler = createAllocator(1, mockAmClient,
+ val (handler, _) = createAllocator(1, mockAmClient,
Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
handler.updateResourceRequests()
- val container = createContainer("host1", resource = handler.resource)
+ val container = createContainer("host1", resource =
handler.defaultResource)
handler.handleAllocatedContainers(Array(container))
// get amount of memory and vcores from resource, so effectively skipping
their validation
- val expectedResources = Resource.newInstance(handler.resource.getMemory(),
- handler.resource.getVirtualCores)
+ val expectedResources =
Resource.newInstance(handler.defaultResource.getMemory(),
+ handler.defaultResource.getVirtualCores)
setResourceRequests(Map("gpu" -> "2G"), expectedResources)
val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
@@ -195,10 +291,10 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
Map(EXECUTOR_GPU_ID.amountConf -> "3",
EXECUTOR_FPGA_ID.amountConf -> "2",
madeupConfigName -> "5")
- val handler = createAllocator(1, mockAmClient, sparkResources)
+ val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
handler.updateResourceRequests()
- val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource)
+ val yarnRInfo =
ResourceRequestTestHelper.getResources(handler.defaultResource)
val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value)
).toMap
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
@@ -210,17 +306,18 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
test("container should not be created if requested number if met") {
// request a single container and receive it
- val handler = createAllocator(1)
+ val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (1)
+ handler.getNumContainersPendingAllocate should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container.getId)
+ val hostTocontainer =
handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+ hostTocontainer.get("host1").get should contain(container.getId)
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container2))
@@ -229,10 +326,10 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
test("some containers allocated") {
// request a few containers and receive some of them
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
@@ -243,16 +340,17 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container1.getId).get should be
("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be
("host1")
handler.allocatedContainerToHostMap.get(container3.getId).get should be
("host2")
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container1.getId)
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container2.getId)
- handler.allocatedHostToContainersMap.get("host2").get should contain
(container3.getId)
+ val hostTocontainer =
handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+ hostTocontainer.get("host1").get should contain(container1.getId)
+ hostTocontainer.get("host1").get should contain (container2.getId)
+ hostTocontainer.get("host2").get should contain (container3.getId)
}
test("receive more containers than requested") {
- val handler = createAllocator(2)
+ val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (2)
+ handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -263,42 +361,52 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container1.getId).get should be
("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be
("host2")
handler.allocatedContainerToHostMap.contains(container3.getId) should be
(false)
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container1.getId)
- handler.allocatedHostToContainersMap.get("host2").get should contain
(container2.getId)
- handler.allocatedHostToContainersMap.contains("host4") should be (false)
+ val hostTocontainer =
handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+ hostTocontainer.get("host1").get should contain(container1.getId)
+ hostTocontainer.get("host2").get should contain (container2.getId)
+ hostTocontainer.contains("host4") should be (false)
}
test("decrease total requested executors") {
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty,
Set.empty)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
- handler.getPendingAllocate.size should be (3)
+ handler.getNumContainersPendingAllocate should be (3)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
- handler.allocatedHostToContainersMap.get("host1").get should contain
(container.getId)
+ val hostTocontainer =
handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+ hostTocontainer.get("host1").get should contain(container.getId)
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty,
Set.empty)
+ resourceProfileToTotalExecs(defaultRP) = 2
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
- handler.getPendingAllocate.size should be (1)
+ handler.getNumContainersPendingAllocate should be (1)
}
test("decrease total requested executors to less than currently running") {
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty,
Set.empty)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
- handler.getPendingAllocate.size should be (3)
+ handler.getNumContainersPendingAllocate should be (3)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -306,23 +414,28 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (2)
- handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty,
Set.empty)
+ resourceProfileToTotalExecs(defaultRP) = 1
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
- handler.getPendingAllocate.size should be (0)
+ handler.getNumContainersPendingAllocate should be (0)
handler.getNumExecutorsRunning should be (2)
}
test("kill executors") {
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty,
Set.empty)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id
) }
val statuses = Seq(container1, container2).map { c =>
@@ -331,20 +444,20 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.updateResourceRequests()
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (1)
+ handler.getNumContainersPendingAllocate should be (1)
}
test("kill same executor multiple times") {
- val handler = createAllocator(2)
+ val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (2)
+ handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
- handler.getPendingAllocate.size should be (0)
+ handler.getNumContainersPendingAllocate should be (0)
val executorToKill = handler.executorIdToContainer.keys.head
handler.killExecutor(executorToKill)
@@ -353,22 +466,25 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty,
Set.empty)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
- handler.getPendingAllocate.size should be (1)
+ handler.getNumContainersPendingAllocate should be (1)
}
test("process same completed container multiple times") {
- val handler = createAllocator(2)
+ val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (2)
+ handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
- handler.getPendingAllocate.size should be (0)
+ handler.getNumContainersPendingAllocate should be (0)
val statuses = Seq(container1, container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE,
"Finished", 0)
@@ -379,16 +495,19 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
}
test("lost executor removed from backend") {
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(),
Set.empty)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE,
"Failed", -1)
@@ -397,7 +516,7 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (2)
+ handler.getNumContainersPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}
@@ -406,28 +525,35 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
// Internally we track the set of blacklisted nodes, but yarn wants us to
send *changes*
// to the blacklist. This makes sure we are sending the right updates.
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
- val handler = createAllocator(4, mockAmClient)
- handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(),
Set("hostA"))
+ val (handler, _) = createAllocator(4, mockAmClient)
+ val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
+ val numLocalityAwareTasksPerResourceProfileId =
mutable.HashMap(defaultRPId -> 0)
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set("hostA"))
verify(mockAmClient).updateBlacklist(Seq("hostA").asJava,
Seq[String]().asJava)
val blacklistedNodes = Set(
"hostA",
"hostB"
)
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(),
blacklistedNodes)
- verify(mockAmClient).updateBlacklist(Seq("hostB").asJava,
Seq[String]().asJava)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(),
Set.empty)
+ resourceProfileToTotalExecs(defaultRP) = 2
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map(), blacklistedNodes)
+ verify(mockAmClient).updateBlacklist(Seq("hostB").asJava,
Seq[String]().asJava)
+ resourceProfileToTotalExecs(defaultRP) = 3
+
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA",
"hostB").asJava)
}
test("window based failure executor counting") {
sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L)
- val handler = createAllocator(4)
+ val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getPendingAllocate.size should be (4)
+ handler.getNumContainersPendingAllocate should be (4)
val containers = Seq(
createContainer("host1"),
@@ -468,7 +594,7 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
val rmClientSpy = spy(rmClient)
val maxExecutors = 11
- val handler = createAllocator(
+ val (handler, _) = createAllocator(
maxExecutors,
rmClientSpy,
Map(
@@ -525,9 +651,9 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
try {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
- val allocator = createAllocator(maxExecutors = 1,
+ val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
- val memory = allocator.resource.getMemory
+ val memory = handler.defaultResource.getMemory
assert(memory ==
executorMemory + offHeapMemoryInMB +
YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
} finally {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index c0c6fff..9003c2f 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -51,9 +51,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with
MockitoSugar with Loc
private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc:
SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
- def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit
= {
- this.rpHostToLocalTaskCount =
Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
- hostToLocalTaskCount)
+ def setHostToLocalTaskCount(hostToLocalTaskCount: Map[Int, Map[String,
Int]]): Unit = {
+ this.rpHostToLocalTaskCount = hostToLocalTaskCount
}
}
@@ -64,21 +63,24 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with
MockitoSugar with Loc
val yarnSchedulerBackendExtended = new TestYarnSchedulerBackend(sched, sc)
yarnSchedulerBackend = yarnSchedulerBackendExtended
val ser = new JavaSerializer(sc.conf).newInstance()
+ val defaultResourceProf =
ResourceProfile.getOrCreateDefaultProfile(sc.getConf)
for {
blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
numRequested <- 0 until 10
hostToLocalCount <- IndexedSeq(
- Map[String, Int](),
- Map("a" -> 1, "b" -> 2)
+ Map(defaultResourceProf.id -> Map.empty[String, Int]),
+ Map(defaultResourceProf.id -> Map("a" -> 1, "b" -> 2))
)
} {
yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
sched.setNodeBlacklist(blacklist)
- val numReq = Map(ResourceProfile.getOrCreateDefaultProfile(sc.getConf)
-> numRequested)
- val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numReq)
- assert(req.requestedTotal === numRequested)
+ val request = Map(defaultResourceProf -> numRequested)
+ val req = yarnSchedulerBackendExtended.prepareRequestExecutors(request)
+ assert(req.resourceProfileToTotalExecs(defaultResourceProf) ===
numRequested)
assert(req.nodeBlacklist === blacklist)
- assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
+ val hosts =
+
req.hostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID).keySet
+ assert(hosts.intersect(blacklist).isEmpty)
// Serialize to make sure serialization doesn't throw an error
ser.serialize(req)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]