This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9b465a2a9adf [SPARK-53324][K8S] Introduce pending pod limit per
ResourceProfile
9b465a2a9adf is described below
commit 9b465a2a9adfd286416e7e2203c882d3e724e057
Author: ForVic <[email protected]>
AuthorDate: Thu Oct 23 13:21:30 2025 -0700
[SPARK-53324][K8S] Introduce pending pod limit per ResourceProfile
### What changes were proposed in this pull request?
Introducing a limit for pending PODs (newly created/requested executors
included) per resource profile.
There exists a config for a global limit for all resource profiles, but
here we add a limit per resource profile. #33492 does a lot of the plumbing for
us already, counting newly created and pending pods, and we can just pass
through the pending pods per resource profile, and limit the number of requests
we were going to make for pods for that resource profile to
min(previousRequest, maxPodsPerRP).
### Why are the changes needed?
For multiple resource profile use cases you can set limits that apply at
the resource profile level, instead of globally.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
unit tests added
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51913 from ForVic/vsunderl/max_pending_pods_per_rpid.
Authored-by: ForVic <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 12 ++++
.../cluster/k8s/ExecutorPodsAllocator.scala | 20 +++++--
.../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 68 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 4 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3e21fc14e6e6..80ac2d02e347 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -778,6 +778,18 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Maximum number of pending pods should
be a positive integer")
.createWithDefault(Int.MaxValue)
+ val KUBERNETES_MAX_PENDING_PODS_PER_RPID =
+ ConfigBuilder("spark.kubernetes.allocation.maxPendingPodsPerRp")
+ .doc("Maximum number of pending PODs allowed per resource profile ID
during executor " +
+ "allocation. This provides finer-grained control over pending pods by
limiting them " +
+ "per resource profile rather than globally. When set, this limit is
enforced " +
+ "independently for each resource profile ID.")
+ .version("4.1.0")
+ .intConf
+ .checkValue(value => value > 0,
+ "Maximum number of pending pods per rp id should be a positive
integer")
+ .createWithDefault(Int.MaxValue)
+
val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD =
ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod")
.doc("Time to wait for graceful shutdown
kubernetes-executor-snapshots-subscribers " +
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 48e48cc3fbc7..bd1a65fa0255 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -73,6 +73,15 @@ class ExecutorPodsAllocator(
protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
+ protected val maxPendingPodsPerRpid =
conf.get(KUBERNETES_MAX_PENDING_PODS_PER_RPID)
+
+ // If maxPendingPodsPerRpid is set, ensure it's not greater than
maxPendingPods
+ if (maxPendingPodsPerRpid != Int.MaxValue) {
+ require(maxPendingPodsPerRpid <= maxPendingPods,
+ s"Maximum pending pods per resource profile ID ($maxPendingPodsPerRpid)
must be less than " +
+ s"or equal to maximum pending pods ($maxPendingPods).")
+ }
+
protected val podCreationTimeout = math.max(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -350,7 +359,7 @@ class ExecutorPodsAllocator(
}
}
if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum)
{
- Some(rpId, podCountForRpId, targetNum)
+ Some(rpId, podCountForRpId, targetNum, notRunningPodCountForRpId)
} else {
// for this resource profile we do not request more PODs
None
@@ -364,10 +373,13 @@ class ExecutorPodsAllocator(
if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0 &&
!(snapshots.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get()))
{
ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId,
remainingSlotFromPendingPods)
- .foreach { case ((rpId, podCountForRpId, targetNum),
sharedSlotFromPendingPods) =>
+ .foreach { case ((rpId, podCountForRpId, targetNum,
pendingPodCountForRpId),
+ sharedSlotFromPendingPods) =>
+ val remainingSlotsForRpId = maxPendingPodsPerRpid -
pendingPodCountForRpId
val numMissingPodsForRpId = targetNum - podCountForRpId
- val numExecutorsToAllocate =
- math.min(math.min(numMissingPodsForRpId, podAllocationSize),
sharedSlotFromPendingPods)
+ val numExecutorsToAllocate = Seq(numMissingPodsForRpId,
podAllocationSize,
+ sharedSlotFromPendingPods, remainingSlotsForRpId).min
+
logInfo(log"Going to request ${MDC(LogKeys.COUNT,
numExecutorsToAllocate)} executors from" +
log" Kubernetes for ResourceProfile Id:
${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 755883efce11..244a8c96d23f 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -238,6 +238,74 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite
with BeforeAndAfter {
verify(labeledPods, times(1)).delete()
}
+ test("pending pod limit per resource profile ID") {
+ when(podOperations
+ .withField("status.phase", "Pending"))
+ .thenReturn(podOperations)
+ when(podOperations
+ .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+ .thenReturn(podOperations)
+ when(podOperations
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+ .thenReturn(podOperations)
+ when(podOperations
+ .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any(classOf[Array[String]]):
_*))
+ .thenReturn(podOperations)
+
+ val startTime = Instant.now.toEpochMilli
+ waitForExecutorPodsClock.setTime(startTime)
+
+ // Two resource profiles, default and rp
+ val rpb = new ResourceProfileBuilder()
+ val ereq = new ExecutorResourceRequests()
+ val treq = new TaskResourceRequests()
+ ereq.cores(4).memory("2g")
+ treq.cpus(2)
+ rpb.require(ereq).require(treq)
+ val rp = rpb.build()
+
+ val confWithLowMaxPendingPodsPerRpId = conf.clone
+ .set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2")
+ podsAllocatorUnderTest = new
ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr,
+ executorBuilder, kubernetesClient, snapshotsStore,
waitForExecutorPodsClock)
+ podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
+
+ // Request more than the max per rp for one rp
+ podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2,
rp -> 3))
+ // 2 for default, and 2 for rp
+ assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1,
defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2,
defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
+ verify(podResource, times(4)).create()
+
+ // Mark executor 2 and 3 as pending, leaving 2 as newly created but this
does not free up
+ // any pending pod slot so no new pod is requested
+ snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
+ snapshotsStore.updatePod(pendingExecutor(3, rp.id))
+ snapshotsStore.notifySubscribers()
+ assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
+ verify(podResource, times(4)).create()
+ verify(labeledPods, never()).delete()
+
+ // Downscaling for defaultProfile resource ID with 1 executor to make one
free slot
+ // for pendings pods, the non default should still be limited by the max
pending pods per rp.
+ waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+ podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1,
rp -> 3))
+ snapshotsStore.notifySubscribers()
+ assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+ verify(labeledPods, times(1)).delete()
+
+ // Make one pod running from non-default rp so we have one more slot for
pending pods.
+ snapshotsStore.updatePod(runningExecutor(3, rp.id))
+ snapshotsStore.updatePod(pendingExecutor(4, rp.id))
+ snapshotsStore.notifySubscribers()
+ assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+ verify(labeledPods, times(1)).delete()
+ }
+
test("Initially request executors in batches. Do not request another batch
if the" +
" first has not finished.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile ->
(podAllocationSize + 1)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]