This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 474b1bb [SPARK-29154][CORE] Update Spark scheduler for stage level
scheduling
474b1bb is described below
commit 474b1bb5c2bce2f83c4dd8e19b9b7c5b3aebd6c4
Author: Thomas Graves <[email protected]>
AuthorDate: Thu Mar 26 09:46:36 2020 -0500
[SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
### What changes were proposed in this pull request?
This is the core scheduler changes to support Stage level scheduling.
The main changes here include modification to the DAGScheduler to look at
the ResourceProfiles associated with an RDD and have those applied inside the
scheduler.
Currently if multiple RDD's in a stage have conflicting ResourceProfiles we
throw an error. logic to allow this will happen in SPARK-29153. I added the
interfaces to RDD to add and get the REsourceProfile so that I could add unit
tests for the scheduler. These are marked as private for now until we finish
the feature and will be exposed in SPARK-29150. If you think this is confusing
I can remove those and remove the tests and add them back later.
I modified the task scheduler to make sure to only schedule on executor
that exactly match the resource profile. It will then check those executors to
make sure the current resources meet the task needs before assigning it. In
here I changed the way we do the custom resource assignment.
Other changes here include having the cpus per task passed around so that
we can properly account for them. Previously we just used the one global
config, but now it can change based on the ResourceProfile.
I removed the exceptions that require the cores to be the limiting
resource. With this change all the places I found that used executor cores
/task cpus as slots has been updated to use the ResourceProfile logic and look
to see what resource is limiting.
### Why are the changes needed?
Stage level sheduling feature
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
unit tests and lots of manual testing
Closes #27773 from tgravescs/SPARK-29154.
Lead-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Thomas Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../main/scala/org/apache/spark/SparkContext.scala | 27 +--
.../org/apache/spark/internal/config/Tests.scala | 9 +
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 27 +++
.../apache/spark/resource/ResourceProfile.scala | 42 +++--
.../spark/resource/ResourceProfileManager.scala | 11 +-
.../org/apache/spark/resource/ResourceUtils.scala | 13 +-
.../spark/resource/TaskResourceRequests.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 70 +++++---
.../apache/spark/scheduler/SchedulerBackend.scala | 8 +-
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 178 ++++++++++++++-----
.../scala/org/apache/spark/scheduler/TaskSet.scala | 3 +-
.../apache/spark/scheduler/TaskSetManager.scala | 32 ++--
.../org/apache/spark/scheduler/WorkerOffer.scala | 5 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 24 ++-
.../scheduler/local/LocalSchedulerBackend.scala | 9 +-
.../deploy/StandaloneDynamicAllocationSuite.scala | 1 -
.../CoarseGrainedExecutorBackendSuite.scala | 4 +-
.../CoarseGrainedSchedulerBackendSuite.scala | 13 +-
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 +++++++++++-
.../scheduler/ExternalClusterManagerSuite.scala | 3 +-
.../org/apache/spark/scheduler/FakeTask.scala | 31 +++-
.../org/apache/spark/scheduler/PoolSuite.scala | 4 +-
.../scheduler/SchedulerIntegrationSuite.scala | 5 +-
.../spark/scheduler/TaskSchedulerImplSuite.scala | 192 ++++++++++++++++++++-
.../spark/scheduler/TaskSetManagerSuite.scala | 91 ++++++----
.../mesos/MesosFineGrainedSchedulerBackend.scala | 3 +-
26 files changed, 704 insertions(+), 218 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdb98db..588e7dc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1597,13 +1597,17 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Get the max number of tasks that can be concurrent launched currently.
+ * Get the max number of tasks that can be concurrent launched based on the
ResourceProfile
+ * being used.
* Note that please don't cache the value returned by this method, because
the number can change
* due to add/remove executors.
*
+ * @param rp ResourceProfile which to use to calculate max concurrent tasks.
* @return The max number of tasks that can be concurrent launched currently.
*/
- private[spark] def maxNumConcurrentTasks(): Int =
schedulerBackend.maxNumConcurrentTasks()
+ private[spark] def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+ schedulerBackend.maxNumConcurrentTasks(rp)
+ }
/**
* Update the cluster manager on our scheduling needs. Three bits of
information are included
@@ -2764,23 +2768,10 @@ object SparkContext extends Logging {
// others its checked in ResourceProfile.
def checkResourcesPerTask(executorCores: Int): Unit = {
val taskCores = sc.conf.get(CPUS_PER_TASK)
- validateTaskCpusLargeEnough(executorCores, taskCores)
- val defaultProf = sc.resourceProfileManager.defaultResourceProfile
- // TODO - this is temporary until all of stage level scheduling feature
is integrated,
- // fail if any other resource limiting due to dynamic allocation and
scheduler using
- // slots based on cores
- val cpuSlots = executorCores/taskCores
- val limitingResource = defaultProf.limitingResource(sc.conf)
- if (limitingResource.nonEmpty &&
!limitingResource.equals(ResourceProfile.CPUS) &&
- defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
- throw new IllegalArgumentException("The number of slots on an executor
has to be " +
- "limited by the number of cores, otherwise you waste resources and "
+
- "some scheduling doesn't work properly. Your configuration has " +
- s"core/task cpu slots = ${cpuSlots} and " +
- s"${limitingResource} = " +
- s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your
configuration " +
- "so that all resources require same number of executor slots.")
+ if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) {
+ validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
}
+ val defaultProf = sc.resourceProfileManager.defaultResourceProfile
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf,
Some(executorCores))
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
index 15610e8..33dac04 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -73,4 +73,13 @@ private[spark] object Tests {
.booleanConf
.createWithDefault(false)
+ // This configuration is used for unit tests to allow skipping the task cpus
to cores validation
+ // to allow emulating standalone mode behavior while running in local mode.
Standalone mode
+ // by default doesn't specify a number of executor cores, it just uses all
the ones available
+ // on the host.
+ val SKIP_VALIDATE_CORES_TESTING =
+ ConfigBuilder("spark.testing.skipValidateCores")
+ .booleanConf
+ .createWithDefault(false)
+
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a26b579..f59c587 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -42,6 +42,7 @@ import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
@@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
+ /**
+ * Specify a ResourceProfile to use when calculating this RDD. This is only
supported on
+ * certain cluster managers and currently requires dynamic allocation to be
enabled.
+ * It will result in new executors with the resources specified being
acquired to
+ * calculate the RDD.
+ */
+ // PRIVATE for now, added for testing purposes, will be made public with
SPARK-29150
+ @Experimental
+ @Since("3.0.0")
+ private[spark] def withResources(rp: ResourceProfile): this.type = {
+ resourceProfile = Option(rp)
+ sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
+ this
+ }
+
+ /**
+ * Get the ResourceProfile specified with this RDD or null if it wasn't
specified.
+ * @return the user specified ResourceProfile or null (for Java
compatibility) if
+ * none was specified
+ */
+ // PRIVATE for now, added for testing purposes, will be made public with
SPARK-29150
+ @Experimental
+ @Since("3.0.0")
+ private[spark] def getResourceProfile(): ResourceProfile =
resourceProfile.getOrElse(null)
+
// =======================================================================
// Other internal methods and fields
// =======================================================================
private var storageLevel: StorageLevel = StorageLevel.NONE
+ private var resourceProfile: Option[ResourceProfile] = None
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = sc.getCallSite()
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 844026d..96c456e 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -76,6 +76,21 @@ class ResourceProfile(
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}
+ /*
+ * This function takes into account fractional amounts for the task resource
requirement.
+ * Spark only supports fractional amounts < 1 to basically allow for
multiple tasks
+ * to use the same resource address.
+ * The way the scheduler handles this is it adds the same address the number
of slots per
+ * address times and then the amount becomes 1. This way it re-uses the same
address
+ * the correct number of times. ie task requirement amount=0.25 ->
addrs["0", "0", "0", "0"]
+ * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress.
+ */
+ private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = {
+ val taskAmount = taskResources.getOrElse(resource,
+ throw new SparkException(s"Resource $resource doesn't exist in profile
id: $id"))
+ if (taskAmount.amount < 1) 1 else taskAmount.amount.toInt
+ }
+
private[spark] def getNumSlotsPerAddress(resource: String, sparkConf:
SparkConf): Int = {
_executorResourceSlotsPerAddr.getOrElse {
calculateTasksAndLimitingResource(sparkConf)
@@ -137,7 +152,7 @@ class ResourceProfile(
assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
val coresPerExecutor =
getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
_coresLimitKnown = true
- ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+ ResourceUtils.validateTaskCpusLargeEnough(sparkConf, coresPerExecutor,
cpusPerTask)
val tasksBasedOnCores = coresPerExecutor / cpusPerTask
// Note that if the cores per executor aren't set properly this
calculation could be off,
// we default it to just be 1 in order to allow checking of the rest of
the custom
@@ -163,17 +178,6 @@ class ResourceProfile(
numPartsPerResourceMap(rName) = parts
val numTasks = ((execReq.amount * parts) / numPerTask).toInt
if (taskLimit == -1 || numTasks < taskLimit) {
- if (shouldCheckExecCores) {
- // TODO - until resource profiles full implemented we need to
error if cores not
- // limiting resource because the scheduler code uses that for slots
- throw new IllegalArgumentException("The number of slots on an
executor has to be " +
- "limited by the number of cores, otherwise you waste resources
and " +
- "some scheduling doesn't work properly. Your configuration has "
+
- s"core/task cpu slots = ${taskLimit} and " +
- s"${execReq.resourceName} = ${numTasks}. " +
- "Please adjust your configuration so that all resources require
same number " +
- "of executor slots.")
- }
limitingResource = rName
taskLimit = numTasks
}
@@ -183,12 +187,6 @@ class ResourceProfile(
"no corresponding task resource request was specified.")
}
}
- if(!shouldCheckExecCores && execResourceToCheck.nonEmpty) {
- // if we can't rely on the executor cores config throw a warning for user
- logWarning("Please ensure that the number of slots available on your " +
- "executors is limited by the number of cores to task cpus and not
another " +
- "custom resource.")
- }
if (taskResourcesToCheck.nonEmpty) {
throw new SparkException("No executor resource configs were not
specified for the " +
s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
@@ -319,4 +317,12 @@ object ResourceProfile extends Logging {
rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
rp.executorResources.filterKeys(k =>
!ResourceProfile.allSupportedExecutorResources.contains(k))
}
+
+ /*
+ * Get the number of cpus per task if its set in the profile, otherwise
return the
+ * cpus per task for the default profile.
+ */
+ private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf:
SparkConf): Int = {
+ rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
+ }
}
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 06db946..fabc0bd 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -41,7 +41,6 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf) extends Loggin
def defaultResourceProfile: ResourceProfile = defaultProfile
- private val taskCpusDefaultProfile = defaultProfile.getTaskCpus.get
private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
private val master = sparkConf.getOption("spark.master")
private val isNotYarn = master.isDefined && !master.get.equals("yarn")
@@ -64,8 +63,10 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf) extends Loggin
isSupported(rp)
// force the computation of maxTasks and limitingResource now so we don't
have cost later
rp.limitingResource(sparkConf)
- logInfo(s"Adding ResourceProfile id: ${rp.id}")
- resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+ val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+ if (res == null) {
+ logInfo(s"Added ResourceProfile id: ${rp.id}")
+ }
}
/*
@@ -79,8 +80,4 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf) extends Loggin
}
rp
}
-
- def taskCpusForProfileId(rpId: Int): Int = {
- resourceProfileFromId(rpId).getTaskCpus.getOrElse(taskCpusDefaultProfile)
- }
}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 2227255..36ef906 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES,
RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX}
-import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING}
+import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING,
SKIP_VALIDATE_CORES_TESTING}
import org.apache.spark.util.Utils
/**
@@ -392,7 +392,7 @@ private[spark] object ResourceUtils extends Logging {
s"${resourceRequest.id.resourceName}")
}
- def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+ def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int,
taskCpus: Int): Boolean = {
// Number of cores per executor must meet at least one task requirement.
if (execCores < taskCpus) {
throw new SparkException(s"The number of cores per executor
(=$execCores) has to be >= " +
@@ -414,7 +414,7 @@ private[spark] object ResourceUtils extends Logging {
val coresKnown = rp.isCoresLimitKnown
var limitingResource = rp.limitingResource(sparkConf)
var maxTaskPerExec = rp.maxTasksPerExecutor(sparkConf)
- val taskCpus = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK))
+ val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(rp,
sparkConf)
val cores = if (execCores.isDefined) {
execCores.get
} else if (coresKnown) {
@@ -455,11 +455,12 @@ private[spark] object ResourceUtils extends Logging {
taskReq.foreach { case (rName, treq) =>
val execAmount = execReq(rName).amount
+ // handles fractional
+ val taskAmount = rp.getSchedulerTaskResourceAmount(rName)
val numParts = rp.getNumSlotsPerAddress(rName, sparkConf)
- // handle fractional
- val taskAmount = if (numParts > 1) 1 else treq.amount
if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
- val taskReqStr = s"${taskAmount}/${numParts}"
+ val origTaskAmount = treq.amount
+ val taskReqStr = s"${origTaskAmount}/${numParts}"
val resourceNumSlots = Math.floor(execAmount * numParts /
taskAmount).toInt
val message = s"The configuration of resource: ${treq.resourceName} " +
s"(exec = ${execAmount}, task = ${taskReqStr}, " +
diff --git
a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
index 9624b51..9a5114f 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.resource.ResourceProfile._
/**
- * A set of task resource requests. This is used in conjuntion with the
ResourceProfile to
+ * A set of task resource requests. This is used in conjunction with the
ResourceProfile to
* programmatically specify the resources needed for an RDD that will be
applied at the
* stage level.
*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a226b65..079cf11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -385,15 +385,17 @@ private[spark] class DAGScheduler(
def createShuffleMapStage[K, V, C](
shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
+ val (shuffleDeps, resourceProfiles) =
getShuffleDependenciesAndResourceProfiles(rdd)
+ val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
- checkBarrierStageWithNumSlots(rdd)
+ checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
- val parents = getOrCreateParentStages(rdd, jobId)
+ val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep,
mapOutputTracker,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ resourceProfile.id)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
@@ -433,14 +435,32 @@ private[spark] class DAGScheduler(
* the check fails consecutively beyond a configured number for a job, then
fail current job
* submission.
*/
- private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+ private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile):
Unit = {
val numPartitions = rdd.getNumPartitions
- val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+ val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions,
maxNumConcurrentTasks)
}
}
+ private[scheduler] def mergeResourceProfilesForStage(
+ stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
+ logDebug(s"Merging stage rdd profiles: $stageResourceProfiles")
+ val resourceProfile = if (stageResourceProfiles.size > 1) {
+ // add option later to actually merge profiles - SPARK-29153
+ throw new IllegalArgumentException("Multiple ResourceProfile's specified
in the RDDs for " +
+ "this stage, please resolve the conflicting ResourceProfile's as Spark
doesn't" +
+ "currently support merging them.")
+ } else {
+ if (stageResourceProfiles.size == 1) {
+ stageResourceProfiles.head
+ } else {
+ sc.resourceProfileManager.defaultResourceProfile
+ }
+ }
+ resourceProfile
+ }
+
/**
* Create a ResultStage associated with the provided jobId.
*/
@@ -450,24 +470,27 @@ private[spark] class DAGScheduler(
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
+ val (shuffleDeps, resourceProfiles) =
getShuffleDependenciesAndResourceProfiles(rdd)
+ val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
- checkBarrierStageWithNumSlots(rdd)
+ checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
- val parents = getOrCreateParentStages(rdd, jobId)
+ val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
- val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
+ callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
/**
- * Get or create the list of parent stages for a given RDD. The new Stages
will be created with
- * the provided firstJobId.
+ * Get or create the list of parent stages for the given shuffle
dependencies. The new
+ * Stages will be created with the provided firstJobId.
*/
- private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int):
List[Stage] = {
- getShuffleDependencies(rdd).map { shuffleDep =>
+ private def getOrCreateParentStages(shuffleDeps:
HashSet[ShuffleDependency[_, _, _]],
+ firstJobId: Int): List[Stage] = {
+ shuffleDeps.map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
@@ -485,7 +508,8 @@ private[spark] class DAGScheduler(
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
- getShuffleDependencies(toVisit).foreach { shuffleDep =>
+ val (shuffleDeps, _) =
getShuffleDependenciesAndResourceProfiles(toVisit)
+ shuffleDeps.foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.prepend(shuffleDep)
waitingForVisit.prepend(shuffleDep.rdd)
@@ -497,10 +521,11 @@ private[spark] class DAGScheduler(
}
/**
- * Returns shuffle dependencies that are immediate parents of the given RDD.
+ * Returns shuffle dependencies that are immediate parents of the given RDD
and the
+ * ResourceProfiles associated with the RDDs for this stage.
*
- * This function will not return more distant ancestors. For example, if C
has a shuffle
- * dependency on B which has a shuffle dependency on A:
+ * This function will not return more distant ancestors for shuffle
dependencies. For example,
+ * if C has a shuffle dependency on B which has a shuffle dependency on A:
*
* A <-- B <-- C
*
@@ -508,9 +533,10 @@ private[spark] class DAGScheduler(
*
* This function is scheduler-visible for the purpose of unit testing.
*/
- private[scheduler] def getShuffleDependencies(
- rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
+ private[scheduler] def getShuffleDependenciesAndResourceProfiles(
+ rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]],
HashSet[ResourceProfile]) = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
+ val resourceProfiles = new HashSet[ResourceProfile]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
@@ -518,6 +544,7 @@ private[spark] class DAGScheduler(
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
+ Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
@@ -526,7 +553,7 @@ private[spark] class DAGScheduler(
}
}
}
- parents
+ (parents, resourceProfiles)
}
/**
@@ -1253,7 +1280,8 @@ private[spark] class DAGScheduler(
logInfo(s"Submitting ${tasks.size} missing tasks from $stage
(${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
properties))
+ tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
properties,
+ stage.resourceProfileId))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 4752353..a5bba64 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import org.apache.spark.resource.ResourceProfile
+
/**
* A backend interface for scheduling systems that allows plugging in
different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets
resource offers as
@@ -80,12 +82,14 @@ private[spark] trait SchedulerBackend {
def getDriverAttributes: Option[Map[String, String]] = None
/**
- * Get the max number of tasks that can be concurrent launched currently.
+ * Get the max number of tasks that can be concurrent launched based on the
ResourceProfile
+ * being used.
* Note that please don't cache the value returned by this method, because
the number can change
* due to add/remove executors.
*
+ * @param rp ResourceProfile which to use to calculate max concurrent tasks.
* @return The max number of tasks that can be concurrent launched currently.
*/
- def maxNumConcurrentTasks(): Int
+ def maxNumConcurrentTasks(rp: ResourceProfile): Int
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1b197c4..7e2fbb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -31,7 +31,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceUtils
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
@@ -93,9 +93,6 @@ private[spark] class TaskSchedulerImpl(
// CPUs to request per task
val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
- // Resources to request per task
- val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf,
SPARK_TASK_PREFIX)
-
// TaskSetManagers are not thread safe, so any access to one should be
synchronized
// on this class. Protected by `this`
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int,
TaskSetManager]]
@@ -209,7 +206,8 @@ private[spark] class TaskSchedulerImpl(
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
- logInfo("Adding task set " + taskSet.id + " with " + tasks.length + "
tasks")
+ logInfo("Adding task set " + taskSet.id + " with " + tasks.length + "
tasks "
+ + "resource profile " + taskSet.resourceProfileId)
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
@@ -340,39 +338,49 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
- if (availableCpus(i) >= CPUS_PER_TASK &&
- resourcesMeetTaskRequirements(availableResources(i))) {
- try {
- for (task <- taskSet.resourceOffer(execId, host, maxLocality,
availableResources(i))) {
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetManager.put(tid, taskSet)
- taskIdToExecutorId(tid) = execId
- executorIdToRunningTaskIds(execId).add(tid)
- availableCpus(i) -= CPUS_PER_TASK
- assert(availableCpus(i) >= 0)
- task.resources.foreach { case (rName, rInfo) =>
- // Remove the first n elements from availableResources
addresses, these removed
- // addresses are the same as that we allocated in
taskSet.resourceOffer() since it's
- // synchronized. We don't remove the exact addresses allocated
because the current
- // approach produces the identical result with less time
complexity.
- availableResources(i).getOrElse(rName,
- throw new SparkException(s"Try to acquire resource $rName that
doesn't exist."))
- .remove(0, rInfo.addresses.size)
- }
- // Only update hosts for a barrier task.
- if (taskSet.isBarrier) {
- // The executor address is expected to be non empty.
- addressesWithDescs += (shuffledOffers(i).address.get -> task)
+ val taskSetRpID = taskSet.taskSet.resourceProfileId
+ // make the resource profile id a hard requirement for now - ie only put
tasksets
+ // on executors where resource profile exactly matches.
+ if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+ val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet,
availableCpus(i),
+ availableResources(i))
+ taskResAssignmentsOpt.foreach { taskResAssignments =>
+ try {
+ val prof =
sc.resourceProfileManager.resourceProfileFromId(taskSetRpID)
+ val taskCpus =
ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
+ val taskDescOption = taskSet.resourceOffer(execId, host,
maxLocality,
+ taskResAssignments)
+ for (task <- taskDescOption) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetManager.put(tid, taskSet)
+ taskIdToExecutorId(tid) = execId
+ executorIdToRunningTaskIds(execId).add(tid)
+ availableCpus(i) -= taskCpus
+ assert(availableCpus(i) >= 0)
+ task.resources.foreach { case (rName, rInfo) =>
+ // Remove the first n elements from availableResources
addresses, these removed
+ // addresses are the same as that we allocated in
taskResourceAssignments since it's
+ // synchronized. We don't remove the exact addresses allocated
because the current
+ // approach produces the identical result with less time
complexity.
+ availableResources(i).getOrElse(rName,
+ throw new SparkException(s"Try to acquire resource $rName
that doesn't exist."))
+ .remove(0, rInfo.addresses.size)
+ }
+ // Only update hosts for a barrier task.
+ if (taskSet.isBarrier) {
+ // The executor address is expected to be non empty.
+ addressesWithDescs += (shuffledOffers(i).address.get -> task)
+ }
+ launchedTask = true
}
- launchedTask = true
+ } catch {
+ case e: TaskNotSerializableException =>
+ logError(s"Resource offer failed, task set ${taskSet.name} was
not serializable")
+ // Do not offer resources for this task, but don't throw an
error to allow other
+ // task sets to be submitted.
+ return launchedTask
}
- } catch {
- case e: TaskNotSerializableException =>
- logError(s"Resource offer failed, task set ${taskSet.name} was not
serializable")
- // Do not offer resources for this task, but don't throw an error
to allow other
- // task sets to be submitted.
- return launchedTask
}
}
}
@@ -381,12 +389,81 @@ private[spark] class TaskSchedulerImpl(
/**
* Check whether the resources from the WorkerOffer are enough to run at
least one task.
+ * Returns None if the resources don't meet the task requirements, otherwise
returns
+ * the task resource assignments to give to the next task. Note that the
assignments maybe
+ * be empty if no custom resources are used.
*/
- private def resourcesMeetTaskRequirements(resources: Map[String,
Buffer[String]]): Boolean = {
- val resourcesFree = resources.map(r => r._1 -> r._2.length)
- val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree,
resourcesReqsPerTask)
- logDebug(s"Resources meet task requirements is: $meetsReqs")
- meetsReqs
+ private def resourcesMeetTaskRequirements(
+ taskSet: TaskSetManager,
+ availCpus: Int,
+ availWorkerResources: Map[String, Buffer[String]]
+ ): Option[Map[String, ResourceInformation]] = {
+ val rpId = taskSet.taskSet.resourceProfileId
+ val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf,
conf)
+ // check if the ResourceProfile has cpus first since that is common case
+ if (availCpus < taskCpus) return None
+ // only look at the resource other then cpus
+ val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf)
+ if (tsResources.isEmpty) return Some(Map.empty)
+ val localTaskReqAssign = HashMap[String, ResourceInformation]()
+ // we go through all resources here so that we can make sure they match
and also get what the
+ // assignments are for the next task
+ for ((rName, taskReqs) <- tsResources) {
+ val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+ availWorkerResources.get(rName) match {
+ case Some(workerRes) =>
+ if (workerRes.size >= taskAmount) {
+ localTaskReqAssign.put(rName, new ResourceInformation(rName,
+ workerRes.take(taskAmount).toArray))
+ } else {
+ return None
+ }
+ case None => return None
+ }
+ }
+ Some(localTaskReqAssign.toMap)
+ }
+
+ // Use the resource that the resourceProfile has as the limiting resource to
calculate the
+ // total number of slots available based on the current offers.
+ private def calculateAvailableSlots(
+ resourceProfileIds: Array[Int],
+ availableCpus: Array[Int],
+ availableResources: Array[Map[String, Buffer[String]]],
+ rpId: Int): Int = {
+ val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter {
case (id, _) =>
+ (id == resourceProfile.id)
+ }
+ val coresKnown = resourceProfile.isCoresLimitKnown
+ var limitingResource = resourceProfile.limitingResource(conf)
+ val taskCpus =
ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)
+
+ offersForResourceProfile.map { case (o, index) =>
+ val numTasksPerExecCores = availableCpus(index) / taskCpus
+ // if limiting resource is empty then we have no other resources, so it
has to be CPU
+ if (limitingResource == ResourceProfile.CPUS ||
limitingResource.isEmpty) {
+ numTasksPerExecCores
+ } else {
+ val taskLimit =
resourceProfile.taskResources.get(limitingResource).map(_.amount)
+ .getOrElse(throw new SparkException("limitingResource returns from
ResourceProfile" +
+ s" $resourceProfile doesn't actually contain that task resource!")
+ )
+ // available addresses already takes into account if there are
fractional
+ // task resource requests
+ val availAddrs =
availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
+ val resourceLimit = (availAddrs / taskLimit).toInt
+ if (!coresKnown) {
+ // when executor cores config isn't set, we can't calculate the real
limiting resource
+ // and number of tasks per executor ahead of time, so calculate it
now based on what
+ // is available.
+ if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else
resourceLimit
+ } else {
+ resourceLimit
+ }
+ }
+ }.sum
}
/**
@@ -429,9 +506,12 @@ private[spark] class TaskSchedulerImpl(
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
+ // Note the size estimate here might be off with different
ResourceProfiles but should be
+ // close estimate
val tasks = shuffledOffers.map(o => new
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
+ val resourceProfileIds = shuffledOffers.map(o =>
o.resourceProfileId).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -441,19 +521,27 @@ private[spark] class TaskSchedulerImpl(
}
}
- // Take each TaskSet in our scheduling order, and then offer it each node
in increasing order
+ // Take each TaskSet in our scheduling order, and then offer it to each
node in increasing order
// of locality levels so that it gets a chance to launch local tasks on
all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF,
RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
- val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+ // we only need to calculate available slots if using barrier
scheduling, otherwise the
+ // value is -1
+ val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
+ val slots = calculateAvailableSlots(resourceProfileIds, availableCpus,
availableResources,
+ taskSet.taskSet.resourceProfileId)
+ slots
+ } else {
+ -1
+ }
// Skip the barrier taskSet if the available slots are less than the
number of pending tasks.
- if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+ if (taskSet.isBarrier && numBarrierSlotsAvailable < taskSet.numTasks) {
// Skip the launch process.
// TODO SPARK-24819 If the job requires more slots than available
(both busy and free
// slots), fail the job on submit.
logInfo(s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots,
while the total " +
- s"number of available slots is $availableSlots.")
+ s"number of available slots is $numBarrierSlotsAvailable.")
} else {
var launchedAnyTask = false
// Record all the executor IDs assigned barrier tasks on.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 517c899..7a8ed16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -28,7 +28,8 @@ private[spark] class TaskSet(
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
- val properties: Properties) {
+ val properties: Properties,
+ val resourceProfileId: Int) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 18684ee..2c79233 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -90,10 +90,18 @@ private[spark] class TaskSetManager(
// SPARK-30417: #cores per executor might not be set in spark conf for
standalone mode, then
// the value of the conf would 1 by default. However, the executor would use
all the cores on
// the worker. Therefore, CPUS_PER_TASK is okay to be greater than 1 without
setting #cores.
- // To handle this case, we assume the minimum number of slots is 1.
+ // To handle this case, we set slots to 1 when we don't know the executor
cores.
// TODO: use the actual number of slots for standalone mode.
- val speculationTasksLessEqToSlots =
- numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1)
+ val speculationTasksLessEqToSlots = {
+ val rpId = taskSet.resourceProfileId
+ val resourceProfile =
sched.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val slots = if (!resourceProfile.isCoresLimitKnown) {
+ 1
+ } else {
+ resourceProfile.maxTasksPerExecutor(conf)
+ }
+ numTasks <= slots
+ }
// For each task, tracks whether a copy of the task has succeeded. A task
will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it
should not
@@ -394,7 +402,7 @@ private[spark] class TaskSetManager(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality,
- availableResources: Map[String, Seq[String]] = Map.empty)
+ taskResourceAssignments: Map[String, ResourceInformation] = Map.empty)
: Option[TaskDescription] =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
@@ -457,18 +465,8 @@ private[spark] class TaskSetManager(
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor
${info.executorId}, " +
- s"partition ${task.partitionId}, $taskLocality,
${serializedTask.limit()} bytes)")
-
- val extraResources = sched.resourcesReqsPerTask.map { taskReq =>
- val rName = taskReq.resourceName
- val count = taskReq.amount
- val rAddresses = availableResources.getOrElse(rName, Seq.empty)
- assert(rAddresses.size >= count, s"Required $count $rName addresses,
but only " +
- s"${rAddresses.size} available.")
- // We'll drop the allocated addresses later inside TaskSchedulerImpl.
- val allocatedAddresses = rAddresses.take(count)
- (rName, new ResourceInformation(rName, allocatedAddresses.toArray))
- }.toMap
+ s"partition ${task.partitionId}, $taskLocality,
${serializedTask.limit()} bytes) " +
+ s"taskResourceAssignments ${taskResourceAssignments}")
sched.dagScheduler.taskStarted(task, info)
new TaskDescription(
@@ -481,7 +479,7 @@ private[spark] class TaskSetManager(
addedFiles,
addedJars,
task.localProperties,
- extraResources,
+ taskResourceAssignments,
serializedTask)
}
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index 522dbfa..92a12f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
import scala.collection.mutable.Buffer
+import org.apache.spark.resource.ResourceProfile
+
/**
* Represents free resources available on an executor.
*/
@@ -30,4 +32,5 @@ case class WorkerOffer(
// `address` is an optional hostPort string, it provide more useful
information than `host`
// when multiple executors are launched on the same host.
address: Option[String] = None,
- resources: Map[String, Buffer[String]] = Map.empty)
+ resources: Map[String, Buffer[String]] = Map.empty,
+ resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6e1efda..cca8e86 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -145,7 +145,10 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
- executorInfo.freeCores += scheduler.CPUS_PER_TASK
+ val rpId = executorInfo.resourceProfileId
+ val prof =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val taskCpus =
ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
+ executorInfo.freeCores += taskCpus
resources.foreach { case (k, v) =>
executorInfo.resourcesInfo.get(k).foreach { r =>
r.release(v.addresses)
@@ -231,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val resourcesInfo = resources.map { case (rName, info) =>
- // tell the executor it can schedule resources up to numParts
times,
+ // tell the executor it can schedule resources up to
numSlotsPerAddress times,
// as configured by the user, or set to 1 as that is the default
(1 task/resource)
val numParts = scheduler.sc.resourceProfileManager
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
@@ -298,7 +301,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
- })
+ }, executorData.resourceProfileId)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
@@ -327,7 +330,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
- }))
+ }, executorData.resourceProfileId))
scheduler.resourceOffers(workOffers)
} else {
Seq.empty
@@ -359,7 +362,10 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
val executorData = executorDataMap(task.executorId)
// Do resources allocation here. The allocated resources will get
released after the task
// finishes.
- executorData.freeCores -= scheduler.CPUS_PER_TASK
+ val rpId = executorData.resourceProfileId
+ val prof =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof,
conf)
+ executorData.freeCores -= taskCpus
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
@@ -606,10 +612,10 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
- override def maxNumConcurrentTasks(): Int = synchronized {
- executorDataMap.values.map { executor =>
- executor.totalCores / scheduler.CPUS_PER_TASK
- }.sum
+ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+ val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
+ val executorsWithResourceProfile =
executorDataMap.values.filter(_.resourceProfileId == rp.id)
+ executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
}
// this function is for testing only
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 42a5afe..e2b1198 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv,
ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -162,7 +162,12 @@ private[spark] class LocalSchedulerBackend(
override def applicationId(): String = appId
- override def maxNumConcurrentTasks(): Int = totalCores /
scheduler.CPUS_PER_TASK
+ // Doesn't support different ResourceProfiles yet
+ // so we expect all executors to be of same ResourceProfile
+ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+ val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
+ totalCores / cpusPerTask
+ }
private def stop(finalState: SparkAppHandle.State): Unit = {
localEndpoint.ask(StopExecutor)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index f8b9930..0b2a58d 100644
---
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -511,7 +511,6 @@ class StandaloneDynamicAllocationSuite
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.resourceOffers(any())).thenReturn(Nil)
- when(taskScheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
when(taskScheduler.sc).thenReturn(sc)
val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf,
securityManager)
diff --git
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 3134a73..e0b5860 100644
---
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -301,8 +301,8 @@ class CoarseGrainedExecutorBackendSuite extends
SparkFunSuite
val taskId = 1000000
// We don't really verify the data, just pass it around.
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
- val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000",
- 19, 1, mutable.Map.empty, mutable.Map.empty, new Properties,
+ val taskDescription = new TaskDescription(taskId, 2, "1", "TASK
1000000", 19,
+ 1, mutable.Map.empty, mutable.Map.empty, new Properties,
Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
val serializedTaskDescription = TaskDescription.encode(taskDescription)
backend.executor = mock[Executor]
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 7666c6c..f4745db 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -72,7 +72,7 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
// Ensure all executors have been launched.
assert(sc.getExecutorIds().length == 4)
}
- assert(sc.maxNumConcurrentTasks() == 12)
+
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf))
== 12)
}
test("compute max number of concurrent tasks can be launched when
spark.task.cpus > 1") {
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
assert(sc.getExecutorIds().length == 4)
}
// Each executor can only launch one task since `spark.task.cpus` is 2.
- assert(sc.maxNumConcurrentTasks() == 4)
+
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf))
== 4)
}
test("compute max number of concurrent tasks can be launched when some
executors are busy") {
@@ -126,7 +126,8 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
assert(taskStarted.get())
assert(taskEnded.get() == false)
// Assert we count in slots on both busy and free executors.
- assert(sc.maxNumConcurrentTasks() == 4)
+ assert(
+
sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 4)
}
} finally {
sc.removeSparkListener(listener)
@@ -187,8 +188,11 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
}
test("extra resources from executor") {
+ import TestUtils._
+
+ val execCores = 3
val conf = new SparkConf()
- .set(EXECUTOR_CORES, 1)
+ .set(EXECUTOR_CORES, execCores)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive
during test
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor
registrations
.setMaster(
@@ -294,7 +298,6 @@ private class CSMockExternalClusterManager extends
ExternalClusterManager {
when(ts.applicationAttemptId()).thenReturn(Some("attempt1"))
when(ts.schedulingMode).thenReturn(SchedulingMode.FIFO)
when(ts.nodeBlacklist()).thenReturn(Set.empty[String])
- when(ts.resourcesReqsPerTask).thenReturn(Seq.empty)
ts
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4486389..33a14ce 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.rdd.{DeterministicLevel, RDD}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
ResourceProfileBuilder, TaskResourceRequests}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException,
MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -2547,9 +2548,9 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
/**
* Checks the DAGScheduler's internal logic for traversing an RDD DAG by
making sure that
- * getShuffleDependencies correctly returns the direct shuffle dependencies
of a particular
- * RDD. The test creates the following RDD graph (where n denotes a narrow
dependency and s
- * denotes a shuffle dependency):
+ * getShuffleDependenciesAndResourceProfiles correctly returns the direct
shuffle dependencies
+ * of a particular RDD. The test creates the following RDD graph (where n
denotes a narrow
+ * dependency and s denotes a shuffle dependency):
*
* A <------------s---------,
* \
@@ -2558,7 +2559,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
* Here, the direct shuffle dependency of C is just the shuffle dependency
on B. The direct
* shuffle dependencies of E are the shuffle dependency on A and the shuffle
dependency on C.
*/
- test("getShuffleDependencies correctly returns only direct shuffle parents")
{
+ test("getShuffleDependenciesAndResourceProfiles correctly returns only
direct shuffle parents") {
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
val rddB = new MyRDD(sc, 2, Nil)
@@ -2569,11 +2570,16 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
val narrowDepD = new OneToOneDependency(rddD)
val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker =
mapOutputTracker)
- assert(scheduler.getShuffleDependencies(rddA) === Set())
- assert(scheduler.getShuffleDependencies(rddB) === Set())
- assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB))
- assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC))
- assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA,
shuffleDepC))
+ val (shuffleDepsA, _) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddA)
+ assert(shuffleDepsA === Set())
+ val (shuffleDepsB, _) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddB)
+ assert(shuffleDepsB === Set())
+ val (shuffleDepsC, _) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddC)
+ assert(shuffleDepsC === Set(shuffleDepB))
+ val (shuffleDepsD, _) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddD)
+ assert(shuffleDepsD === Set(shuffleDepC))
+ val (shuffleDepsE, _) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddE)
+ assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC))
}
test("SPARK-17644: After one stage is aborted for too many failed attempts,
subsequent stages" +
@@ -3141,6 +3147,97 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}
+ test("test default resource profile") {
+ val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+ val (shuffledeps, resourceprofiles) =
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+ val rp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+ assert(rp.id ==
scheduler.sc.resourceProfileManager.defaultResourceProfile.id)
+ }
+
+ test("test 1 resource profile") {
+ val ereqs = new ExecutorResourceRequests().cores(4)
+ val treqs = new TaskResourceRequests().cpus(1)
+ val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+ val rdd = sc.parallelize(1 to 10).map(x => (x, x)).withResources(rp1)
+ val (shuffledeps, resourceprofiles) =
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+ val rpMerged = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+ val expectedid = Option(rdd.getResourceProfile).map(_.id)
+ assert(expectedid.isDefined)
+ assert(expectedid.get != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ assert(rpMerged.id == expectedid.get)
+ }
+
+ test("test 2 resource profiles errors by default") {
+ import org.apache.spark.resource._
+ val ereqs = new ExecutorResourceRequests().cores(4)
+ val treqs = new TaskResourceRequests().cpus(1)
+ val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+ val ereqs2 = new ExecutorResourceRequests().cores(2)
+ val treqs2 = new TaskResourceRequests().cpus(2)
+ val rp2 = new
ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+ val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x,
x)).withResources(rp2)
+ val error = intercept[IllegalArgumentException] {
+ val (shuffledeps, resourceprofiles) =
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+ scheduler.mergeResourceProfilesForStage(resourceprofiles)
+ }.getMessage()
+
+ assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
+ }
+
+ /**
+ * Checks the DAGScheduler's internal logic for traversing an RDD DAG by
making sure that
+ * getShuffleDependenciesAndResourceProfiles correctly returns the direct
shuffle dependencies
+ * of a particular RDD. The test creates the following RDD graph (where n
denotes a narrow
+ * dependency and s denotes a shuffle dependency):
+ *
+ * A <------------s---------,
+ * \
+ * B <--s-- C <--s-- D <--n------ E
+ *
+ * Here, the direct shuffle dependency of C is just the shuffle dependency
on B. The direct
+ * shuffle dependencies of E are the shuffle dependency on A and the shuffle
dependency on C.
+ */
+ test("getShuffleDependenciesAndResourceProfiles returns deps and profiles
correctly") {
+ import org.apache.spark.resource._
+ val ereqs = new ExecutorResourceRequests().cores(4)
+ val treqs = new TaskResourceRequests().cpus(1)
+ val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+ val ereqs2 = new ExecutorResourceRequests().cores(6)
+ val treqs2 = new TaskResourceRequests().cpus(2)
+ val rp2 = new
ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+ val rddWithRp = new MyRDD(sc, 2, Nil).withResources(rp1)
+ val rddA = new MyRDD(sc, 2, Nil).withResources(rp1)
+ val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
+ val rddB = new MyRDD(sc, 2, Nil)
+ val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
+ val rddWithRpDep = new OneToOneDependency(rddWithRp)
+ val rddC = new MyRDD(sc, 1, List(rddWithRpDep,
shuffleDepB)).withResources(rp2)
+ val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
+ val rddD = new MyRDD(sc, 1, List(shuffleDepC))
+ val narrowDepD = new OneToOneDependency(rddD)
+ val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker =
mapOutputTracker)
+
+ val (shuffleDepsA, rprofsA) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddA)
+ assert(shuffleDepsA === Set())
+ assert(rprofsA === Set(rp1))
+ val (shuffleDepsB, rprofsB) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddB)
+ assert(shuffleDepsB === Set())
+ assert(rprofsB === Set())
+ val (shuffleDepsC, rprofsC) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddC)
+ assert(shuffleDepsC === Set(shuffleDepB))
+ assert(rprofsC === Set(rp1, rp2))
+ val (shuffleDepsD, rprofsD) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddD)
+ assert(shuffleDepsD === Set(shuffleDepC))
+ assert(rprofsD === Set())
+ val (shuffleDepsE, rprofsE) =
scheduler.getShuffleDependenciesAndResourceProfiles(rddE)
+ assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC))
+ assert(rprofsE === Set())
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its
preferred locations.
* Note that this checks only the host and not the executor ID.
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 9f593e0..7ead51b 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.Map
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
@@ -71,7 +72,7 @@ private class DummySchedulerBackend extends SchedulerBackend {
def stop(): Unit = {}
def reviveOffers(): Unit = {}
def defaultParallelism(): Int = 1
- def maxNumConcurrentTasks(): Int = 0
+ def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
}
private class DummyTaskScheduler extends TaskScheduler {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 8cb6268..9ec088a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.resource.ResourceProfile
class FakeTask(
stageId: Int,
@@ -42,7 +43,12 @@ object FakeTask {
* locations for each task (given as varargs) if this sequence is not empty.
*/
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
- createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0,
prefLocs: _*)
+ createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
+ }
+
+ def createTaskSet(numTasks: Int, rpId: Int, prefLocs: Seq[TaskLocation]*):
TaskSet = {
+ createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0,
rpId, prefLocs: _*)
}
def createTaskSet(
@@ -50,7 +56,8 @@ object FakeTask {
stageId: Int,
stageAttemptId: Int,
prefLocs: Seq[TaskLocation]*): TaskSet = {
- createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs:
_*)
+ createTaskSet(numTasks, stageId, stageAttemptId, priority = 0,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
}
def createTaskSet(
@@ -58,6 +65,7 @@ object FakeTask {
stageId: Int,
stageAttemptId: Int,
priority: Int,
+ rpId: Int,
prefLocs: Seq[TaskLocation]*): TaskSet = {
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
throw new IllegalArgumentException("Wrong number of task locations")
@@ -65,7 +73,7 @@ object FakeTask {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
}
- new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId)
}
def createShuffleMapTaskSet(
@@ -91,11 +99,21 @@ object FakeTask {
}, prefLocs(i), new Properties,
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
}
- new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
}
def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*):
TaskSet = {
- createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority =
0, prefLocs: _*)
+ createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority =
0,
+ rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
+ }
+
+ def createBarrierTaskSet(
+ numTasks: Int,
+ rpId: Int,
+ prefLocs: Seq[TaskLocation]*): TaskSet = {
+ createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority =
0,
+ rpId = rpId, prefLocs: _*)
}
def createBarrierTaskSet(
@@ -103,6 +121,7 @@ object FakeTask {
stageId: Int,
stageAttemptId: Int,
priority: Int,
+ rpId: Int,
prefLocs: Seq[TaskLocation]*): TaskSet = {
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
throw new IllegalArgumentException("Wrong number of task locations")
@@ -110,6 +129,6 @@ object FakeTask {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil,
isBarrier = true)
}
- new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index b953add..d9de976 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.SchedulingMode._
/**
@@ -39,7 +40,8 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(stageId, i, Nil)
}
- new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null),
0)
+ new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0)
}
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId:
Int): Unit = {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index dff8975..0874163 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL
import org.apache.spark.rdd.RDD
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
/**
@@ -385,7 +386,7 @@ private[spark] abstract class MockBackend(
}.toIndexedSeq
}
- override def maxNumConcurrentTasks(): Int = 0
+ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
/**
* This is called by the scheduler whenever it has tasks it would like to
schedule, when a tasks
@@ -406,9 +407,9 @@ private[spark] abstract class MockBackend(
(taskDescription, task)
}
newTasks.foreach { case (taskDescription, _) =>
+ freeCores -= taskScheduler.CPUS_PER_TASK
executorIdToExecutor(taskDescription.executorId).freeCores -=
taskScheduler.CPUS_PER_TASK
}
- freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
assignedTasksWaitingToRun ++= newTasks
}
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index e7ecf84..9ee84a8 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -31,6 +31,7 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.util.ManualClock
@@ -40,7 +41,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
def stop(): Unit = {}
def reviveOffers(): Unit = {}
def defaultParallelism(): Int = 1
- def maxNumConcurrentTasks(): Int = 0
+ def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
}
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with
BeforeAndAfterEach
@@ -202,7 +203,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 1
val taskSet = new TaskSet(
- Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)), 0, 0, 0, null)
+ Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
+ 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0",
"host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
@@ -216,7 +218,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
// still be processed without error
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
val taskSet2 = new TaskSet(
- Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)), 1, 0, 0, null)
+ Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
+ 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
taskScheduler.submitTasks(taskSet2)
taskDescriptions =
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
@@ -1135,6 +1138,96 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(0 === taskDescriptions.length)
}
+ test("don't schedule for a barrier taskSet if available slots are less than
" +
+ "pending tasks gpus limiting") {
+ val taskCpus = 1
+ val taskScheduler = setupSchedulerWithMaster(
+ s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString,
+ "spark.executor.resource.gpu.amount" -> "1",
"spark.task.resource.gpu.amount" -> "1")
+
+ val numFreeCores = 3
+ val workerOffers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625"),
+ Map("gpu" -> Seq("0").toBuffer)),
+ new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627"),
+ Map("gpu" -> Seq("0").toBuffer)))
+ val attempt1 = FakeTask.createBarrierTaskSet(3)
+
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(0 === taskDescriptions.length)
+ }
+
+ test("schedule tasks for a barrier taskSet if all tasks can be launched
together gpus") {
+ val taskCpus = 1
+ val taskScheduler = setupSchedulerWithMaster(
+ s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString,
+ "spark.executor.resource.gpu.amount" -> "1",
"spark.task.resource.gpu.amount" -> "1")
+
+ val numFreeCores = 3
+ val workerOffers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625"),
+ Map("gpu" -> Seq("0").toBuffer)),
+ new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627"),
+ Map("gpu" -> Seq("0").toBuffer)),
+ new WorkerOffer("executor2", "host2", numFreeCores,
Some("192.168.0.101:49629"),
+ Map("gpu" -> Seq("0").toBuffer)))
+ val attempt1 = FakeTask.createBarrierTaskSet(3)
+
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(3 === taskDescriptions.length)
+ }
+
+ // barrier scheduling doesn't yet work with dynamic allocation but test it
with another
+ // ResourceProfile anyway to make sure code path works when it is supported
+ test("schedule tasks for a barrier taskSet if all tasks can be launched
together " +
+ "diff ResourceProfile") {
+ val taskCpus = 1
+ val taskScheduler = setupSchedulerWithMaster(
+ s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString)
+ val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2)
+ val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+ val rp = new ResourceProfile(execReqs.requests, taskReqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+ val numFreeCores = 2
+ val workerOffers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625"),
+ Map("gpu" -> Seq("0", "1").toBuffer), rp.id),
+ new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627"),
+ Map("gpu" -> Seq("0", "1").toBuffer), rp.id))
+ val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id)
+
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(3 === taskDescriptions.length)
+ }
+
+ test("schedule tasks for a barrier taskSet if all tasks can be launched
together " +
+ "diff ResourceProfile, but not enough gpus") {
+ val taskCpus = 1
+ val taskScheduler = setupSchedulerWithMaster(
+ s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString)
+ val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2)
+ val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+ val rp = new ResourceProfile(execReqs.requests, taskReqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+ val numFreeCores = 2
+ // make each of the worker offers only have 1 GPU, thus making it not
enough
+ val workerOffers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625"),
+ Map("gpu" -> Seq("0").toBuffer), rp.id),
+ new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627"),
+ Map("gpu" -> Seq("0").toBuffer), rp.id))
+ val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id)
+
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(0 === taskDescriptions.length)
+ }
+
test("schedule tasks for a barrier taskSet if all tasks can be launched
together") {
val taskCpus = 2
val taskScheduler = setupSchedulerWithMaster(
@@ -1165,8 +1258,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625")),
new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627")),
new WorkerOffer("executor2", "host2", numFreeCores,
Some("192.168.0.101:49629")))
- val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId
= 0, priority = 1)
- val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0,
priority = 0)
+ val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId
= 0, priority = 1,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0,
priority = 0,
+ rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
// submit highPrio and barrier taskSet
taskScheduler.submitTasks(highPrio)
@@ -1289,6 +1384,93 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(ArrayBuffer("1") ===
taskDescriptions(1).resources.get(GPU).get.addresses)
}
+ test("Scheduler correctly accounts for GPUs per task with fractional
amount") {
+ val taskCpus = 1
+ val taskGpus = 0.33
+ val executorGpus = 1
+ val executorCpus = 4
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> taskCpus.toString,
+ TASK_GPU_ID.amountConf -> taskGpus.toString,
+ EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+ val taskSet = FakeTask.createTaskSet(5)
+
+ val numFreeCores = 4
+ val resources = Map(GPU -> ArrayBuffer("0", "0", "0"))
+ val singleCoreWorkerOffers =
+ IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None,
resources))
+
+ taskScheduler.submitTasks(taskSet)
+ // Launch tasks on executor that satisfies resource requirements.
+ var taskDescriptions =
taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
+ assert(3 === taskDescriptions.length)
+ assert(!failedTaskSet)
+ assert(ArrayBuffer("0") ===
taskDescriptions(0).resources.get(GPU).get.addresses)
+ assert(ArrayBuffer("0") ===
taskDescriptions(1).resources.get(GPU).get.addresses)
+ assert(ArrayBuffer("0") ===
taskDescriptions(2).resources.get(GPU).get.addresses)
+ }
+
+ test("Scheduler works with multiple ResourceProfiles and gpus") {
+ val taskCpus = 1
+ val taskGpus = 1
+ val executorGpus = 4
+ val executorCpus = 4
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> taskCpus.toString,
+ TASK_GPU_ID.amountConf -> taskGpus.toString,
+ EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+ val ereqs = new ExecutorResourceRequests().cores(6).resource(GPU, 6)
+ val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+ val rp = new ResourceProfile(ereqs.requests, treqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+ val taskSet = FakeTask.createTaskSet(3)
+ val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0,
+ priority = 0, rpId = rp.id)
+
+ val resourcesDefaultProf = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+ val resources = Map(GPU -> ArrayBuffer("4", "5", "6", "7", "8", "9"))
+
+ val workerOffers =
+ IndexedSeq(new WorkerOffer("executor0", "host0", 2, None,
resourcesDefaultProf),
+ new WorkerOffer("executor1", "host1", 6, None, resources, rp.id))
+ taskScheduler.submitTasks(taskSet)
+ taskScheduler.submitTasks(rpTaskSet)
+ // should have 2 for default profile and 2 for additional resource profile
+ var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(5 === taskDescriptions.length)
+ var has2Gpus = 0
+ var has1Gpu = 0
+ for (tDesc <- taskDescriptions) {
+ assert(tDesc.resources.contains(GPU))
+ if (tDesc.resources(GPU).addresses.size == 2) {
+ has2Gpus += 1
+ }
+ if (tDesc.resources(GPU).addresses.size == 1) {
+ has1Gpu += 1
+ }
+ }
+ assert(has2Gpus == 3)
+ assert(has1Gpu == 2)
+
+ val resources3 = Map(GPU -> ArrayBuffer("14", "15", "16", "17", "18",
"19"))
+
+ // clear the first 2 worker offers so they don't have any room and add a
third
+ // for the resource profile
+ val workerOffers3 = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 0, None, Map.empty),
+ new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id),
+ new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id))
+ taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
+ assert(2 === taskDescriptions.length)
+ assert(taskDescriptions.head.resources.contains(GPU))
+ assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
+ }
+
/**
* Used by tests to simulate a task failure. This calls the failure handler
explicitly, to ensure
* that all the state is updated when this method returns. Otherwise,
there's no way to know when
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b740e35..4566e3c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -34,6 +34,8 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -213,7 +215,6 @@ class TaskSetManagerSuite
super.afterEach()
}
-
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
@@ -657,7 +658,8 @@ class TaskSetManagerSuite
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
- val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
+ val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0,
+ null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
assert(!manager.emittedTaskSizeWarning)
@@ -672,7 +674,8 @@ class TaskSetManagerSuite
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(
- Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)), 0, 0, 0, null)
+ Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
+ 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
intercept[TaskNotSerializableException] {
@@ -743,7 +746,8 @@ class TaskSetManagerSuite
val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
override def index: Int = 0
}, Seq(TaskLocation("host1", "execA")), new Properties, null)
- val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
+ val taskSet = new TaskSet(Array(singleTask), 0, 0, 0,
+ null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
// Offer host1, which should be accepted as a PROCESS_LOCAL location
@@ -1053,10 +1057,10 @@ class TaskSetManagerSuite
}
// Offer resources for 4 tasks to start
for ((k, v) <- List(
- "exec1" -> "host1",
- "exec1" -> "host1",
- "exec2" -> "host2",
- "exec2" -> "host2")) {
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
@@ -1480,10 +1484,10 @@ class TaskSetManagerSuite
}
// Offer resources for 4 tasks to start
for ((exec, host) <- Seq(
- "exec1" -> "host1",
- "exec1" -> "host1",
- "exec3" -> "host3",
- "exec2" -> "host2")) {
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec3" -> "host3",
+ "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
@@ -1552,10 +1556,10 @@ class TaskSetManagerSuite
}
// Offer resources for 4 tasks to start
for ((k, v) <- List(
- "exec1" -> "host1",
- "exec1" -> "host1",
- "exec2" -> "host2",
- "exec2" -> "host2")) {
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
@@ -1655,7 +1659,7 @@ class TaskSetManagerSuite
assert(FakeRackUtil.numBatchInvocation === 1)
}
- test("TaskSetManager allocate resource addresses from available resources") {
+ test("TaskSetManager passes task resource along") {
import TestUtils._
sc = new SparkContext("local", "test")
@@ -1664,15 +1668,13 @@ class TaskSetManagerSuite
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
- val availableResources = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
- val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF,
availableResources)
+ val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU,
Array("0", "1")))
+ val taskOption =
+ manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments)
assert(taskOption.isDefined)
val allocatedResources = taskOption.get.resources
assert(allocatedResources.size == 1)
assert(allocatedResources(GPU).addresses sameElements Array("0", "1"))
- // Allocated resource addresses should still present in
`availableResources`, they will only
- // get removed inside TaskSchedulerImpl later.
- assert(availableResources(GPU) sameElements Array("0", "1", "2", "3"))
}
test("SPARK-26755 Ensure that a speculative task is submitted only once for
execution") {
@@ -1793,15 +1795,16 @@ class TaskSetManagerSuite
numTasks: Int,
numExecutorCores: Int,
numCoresPerTask: Int): (TaskSetManager, ManualClock) = {
- sc = new SparkContext("local", "test")
- sc.conf.set(config.SPECULATION_ENABLED, true)
- sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
+ val conf = new SparkConf()
+ conf.set(config.SPECULATION_ENABLED, true)
+ conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
// Set the number of slots per executor
- sc.conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString)
- sc.conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
+ conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString)
+ conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
if (speculationThresholdOpt.isDefined) {
- sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key,
speculationThresholdOpt.get)
+ conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key,
speculationThresholdOpt.get)
}
+ sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
// Create a task set with the given number of tasks
val taskSet = FakeTask.createTaskSet(numTasks)
@@ -1890,15 +1893,28 @@ class TaskSetManagerSuite
test("SPARK-30417 when spark.task.cpus is greater than spark.executor.cores
due to " +
"standalone settings, speculate if there is only one task in the stage") {
- val (manager, clock) = testSpeculationDurationSetup(
- Some("60min"),
- // Set the quantile to be 1.0 so that regular speculation would not be
triggered
- speculationQuantile = 1.0,
- numTasks = 1,
- numExecutorCores = 1,
- numCoresPerTask = 2
- )
+ val numTasks = 1
+ val numCoresPerTask = 2
+ val conf = new SparkConf()
+ // skip throwing exception when cores per task > cores per executor to
emulate standalone mode
+ conf.set(SKIP_VALIDATE_CORES_TESTING, true)
+ conf.set(config.SPECULATION_ENABLED, true)
+ conf.set(config.SPECULATION_QUANTILE.key, "1.0")
+ // Skip setting cores per executor to emulate standalone default mode
+ conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
+ conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, "60min")
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ // Create a task set with the given number of tasks
+ val taskSet = FakeTask.createTaskSet(numTasks)
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ manager.isZombie = false
+ // Offer resources for the task to start
+ for (i <- 1 to numTasks) {
+ manager.resourceOffer(s"exec$i", s"host$i", NO_PREF)
+ }
clock.advance(1000*60*60)
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 0)
@@ -1942,7 +1958,8 @@ class TaskSetManagerSuite
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val tasks = Array.tabulate[Task[_]](2)(partition => new
FakeLongTasks(stageId = 0, partition))
- val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0,
priority = 0, null)
+ val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0,
priority = 0, null,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val stageId = taskSet.stageId
val stageAttemptId = taskSet.stageAttemptId
sched.submitTasks(taskSet)
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e2a9914..f1e3fca 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkContext, SparkException,
TaskState}
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.internal.config
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
@@ -457,7 +458,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
super.applicationId
}
- override def maxNumConcurrentTasks(): Int = {
+ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
// TODO SPARK-25074 support this method for
MesosFineGrainedSchedulerBackend
0
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]