This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b465a2a9adf [SPARK-53324][K8S] Introduce pending pod limit per 
ResourceProfile
9b465a2a9adf is described below

commit 9b465a2a9adfd286416e7e2203c882d3e724e057
Author: ForVic <[email protected]>
AuthorDate: Thu Oct 23 13:21:30 2025 -0700

    [SPARK-53324][K8S] Introduce pending pod limit per ResourceProfile
    
    ### What changes were proposed in this pull request?
    Introducing a limit for pending PODs (newly created/requested executors 
included) per resource profile.
    There exists a config for a global limit for all resource profiles, but 
here we add a limit per resource profile. #33492 does a lot of the plumbing for 
us already, counting newly created and pending pods, and we can just pass 
through the pending pods per resource profile, and limit the number of requests 
we were going to make for pods for that resource profile to 
min(previousRequest, maxPodsPerRP).
    
    ### Why are the changes needed?
    For multiple resource profile use cases you can set limits that apply at 
the resource profile level, instead of globally.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    unit tests added
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51913 from ForVic/vsunderl/max_pending_pods_per_rpid.
    
    Authored-by: ForVic <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 ++++
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 20 +++++--
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 68 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 4 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3e21fc14e6e6..80ac2d02e347 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -778,6 +778,18 @@ private[spark] object Config extends Logging {
       .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
       .createWithDefault(Int.MaxValue)
 
+  val KUBERNETES_MAX_PENDING_PODS_PER_RPID =
+    ConfigBuilder("spark.kubernetes.allocation.maxPendingPodsPerRp")
+      .doc("Maximum number of pending PODs allowed per resource profile ID 
during executor " +
+        "allocation. This provides finer-grained control over pending pods by 
limiting them " +
+        "per resource profile rather than globally. When set, this limit is 
enforced " +
+        "independently for each resource profile ID.")
+      .version("4.1.0")
+      .intConf
+      .checkValue(value => value > 0,
+        "Maximum number of pending pods per rp id should be a positive 
integer")
+      .createWithDefault(Int.MaxValue)
+
   val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD =
     
ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod")
       .doc("Time to wait for graceful shutdown 
kubernetes-executor-snapshots-subscribers " +
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 48e48cc3fbc7..bd1a65fa0255 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -73,6 +73,15 @@ class ExecutorPodsAllocator(
 
   protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
 
+  protected val maxPendingPodsPerRpid = 
conf.get(KUBERNETES_MAX_PENDING_PODS_PER_RPID)
+
+  // If maxPendingPodsPerRpid is set, ensure it's not greater than 
maxPendingPods
+  if (maxPendingPodsPerRpid != Int.MaxValue) {
+    require(maxPendingPodsPerRpid <= maxPendingPods,
+      s"Maximum pending pods per resource profile ID ($maxPendingPodsPerRpid) 
must be less than " +
+        s"or equal to maximum pending pods ($maxPendingPods).")
+  }
+
   protected val podCreationTimeout = math.max(
     podAllocationDelay * 5,
     conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -350,7 +359,7 @@ class ExecutorPodsAllocator(
         }
       }
       if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) 
{
-        Some(rpId, podCountForRpId, targetNum)
+        Some(rpId, podCountForRpId, targetNum, notRunningPodCountForRpId)
       } else {
         // for this resource profile we do not request more PODs
         None
@@ -364,10 +373,13 @@ class ExecutorPodsAllocator(
     if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0 &&
         !(snapshots.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get())) 
{
       ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, 
remainingSlotFromPendingPods)
-        .foreach { case ((rpId, podCountForRpId, targetNum), 
sharedSlotFromPendingPods) =>
+        .foreach { case ((rpId, podCountForRpId, targetNum, 
pendingPodCountForRpId),
+            sharedSlotFromPendingPods) =>
+        val remainingSlotsForRpId = maxPendingPodsPerRpid - 
pendingPodCountForRpId
         val numMissingPodsForRpId = targetNum - podCountForRpId
-        val numExecutorsToAllocate =
-          math.min(math.min(numMissingPodsForRpId, podAllocationSize), 
sharedSlotFromPendingPods)
+        val numExecutorsToAllocate = Seq(numMissingPodsForRpId, 
podAllocationSize,
+          sharedSlotFromPendingPods, remainingSlotsForRpId).min
+
         logInfo(log"Going to request ${MDC(LogKeys.COUNT, 
numExecutorsToAllocate)} executors from" +
           log" Kubernetes for ResourceProfile Id: 
${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
           log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 755883efce11..244a8c96d23f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -238,6 +238,74 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     verify(labeledPods, times(1)).delete()
   }
 
+  test("pending pod limit per resource profile ID") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any(classOf[Array[String]]): 
_*))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    // Two resource profiles, default and rp
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build()
+
+    val confWithLowMaxPendingPodsPerRpId = conf.clone
+      .set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2")
+    podsAllocatorUnderTest = new 
ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr,
+      executorBuilder, kubernetesClient, snapshotsStore, 
waitForExecutorPodsClock)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
+
+    // Request more than the max per rp for one rp
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, 
rp -> 3))
+    // 2 for default, and 2 for rp
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, 
defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, 
defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
+    verify(podResource, times(4)).create()
+
+    // Mark executor 2 and 3 as pending, leaving 2 as newly created but this 
does not free up
+    // any pending pod slot so no new pod is requested
+    snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(3, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
+    verify(podResource, times(4)).create()
+    verify(labeledPods, never()).delete()
+
+    // Downscaling for defaultProfile resource ID with 1 executor to make one 
free slot
+    // for pendings pods, the non default should still be limited by the max 
pending pods per rp.
+    waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, 
rp -> 3))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(labeledPods, times(1)).delete()
+
+    // Make one pod running from non-default rp so we have one more slot for 
pending pods.
+    snapshotsStore.updatePod(runningExecutor(3, rp.id))
+    snapshotsStore.updatePod(pendingExecutor(4, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+    verify(labeledPods, times(1)).delete()
+  }
+
   test("Initially request executors in batches. Do not request another batch 
if the" +
     " first has not finished.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
(podAllocationSize + 1)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to