This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new fb16a1ea6fef [SPARK-54197][K8S] Improve 
`ExecutorsPodsLifecycleManager` not to request to delete if `deletionTimestamp` 
exists
fb16a1ea6fef is described below

commit fb16a1ea6fef6b65a73716362679f4645b8d6e29
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Nov 5 17:54:27 2025 -0800

    [SPARK-54197][K8S] Improve `ExecutorsPodsLifecycleManager` not to request 
to delete if `deletionTimestamp` exists
    
    ### What changes were proposed in this pull request?
    
    The current code handling deletion of Failed or Succeeded driver Pods is 
calling the Kubernetes API to delete objects until either the Kubelet as 
started the termination the Pod (the status of the object is terminating).
    
    However, depending on configuration, the ExecutorPodsLifecycleManager loop 
might run multiple times before the Kubelet starts the deletion of the Pod 
object, resulting in un-necessary DELETE calls to the Kubernetes API, which are 
particularly expensive since they are served from Etcd.
    
    Following the Kubernetes API specifications in 
https://kubernetes.io/docs/reference/using-api/api-concepts/
    
    > When a client first sends a delete to request the removal of a resource, 
the .metadata.deletionTimestamp is set to the current time. Once the 
.metadata.deletionTimestamp is set, external controllers that act on finalizers 
may start performing their cleanup work at any time, in any order.
    
    we can assume that whenever the deletionTimestamp is set on a Pod, this 
will be eventually terminated without the need of additional DELETE calls.
    
    ### Why are the changes needed?
    
    This change is required to remove the need of redundant API calls agains 
the Kubernetes API that at scale might lead to excessive load against Etcd.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This patch includes unit-tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52898
    
    Closes #52902 from 
dongjoon-hyun/driver-do-not-call-delete-for-terminating-pods-master.
    
    Lead-authored-by: Dongjoon Hyun <[email protected]>
    Co-authored-by: Andrea Tosatto <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 3b368ca9715c741556bcd38cbe662aeebb79c64d)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 16 ++++++++++---
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala    | 27 +++++++++++++++++++++-
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 9aa99a6c5984..2ad01229f8f2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -201,8 +201,16 @@ private[spark] class ExecutorPodsLifecycleManager(
   private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
     Utils.tryLogNonFatalError {
       if (shouldDeleteExecutors) {
-        // Get pod before deleting it, we can skip deleting if pod is already 
deleted so that
-        // we do not send too many requests to api server.
+        if (updatedPod.getMetadata.getDeletionTimestamp != null) {
+          // Do not call the Kubernetes API if the deletion timestamp
+          // is already set on the updatedPod object.
+          // This is removing the need for un-necessary API roundtrips
+          // against the Kubernetes API.
+          return
+        }
+        // Get pod before deleting it, we can skip deleting if pod is already 
deleted
+        // or has already the deletion timestamp set so that we do not send
+        // too many requests to apu server.
         // If deletion failed on a previous try, we can try again if resync 
informs us the pod
         // is still around.
         // Delete as best attempt - duplicate deletes will throw an exception 
but the end state
@@ -211,7 +219,9 @@ private[spark] class ExecutorPodsLifecycleManager(
           .pods()
           .inNamespace(namespace)
           .withName(updatedPod.getMetadata.getName)
-        if (podToDelete.get() != null) {
+
+        if (podToDelete.get() != null &&
+            podToDelete.get.getMetadata.getDeletionTimestamp == null) {
           podToDelete.delete()
         }
       } else if (!inactivatedPods.contains(execId) && 
!isPodInactive(updatedPod)) {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 4c7ffe692b10..7d81cc0ae16c 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -20,7 +20,7 @@ import java.util.function.UnaryOperator
 
 import scala.collection.mutable
 
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
 import org.mockito.{Mock, MockitoAnnotations}
@@ -219,6 +219,31 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
+  test("Don't delete pod from K8s if deletionTimestamp is already set.") {
+    // Create a failed pod with deletionTimestamp already in the past
+    val basePod = failedExecutorWithoutDeletion(1)
+    val failedPodWithDeletionTimestamp = new PodBuilder(basePod)
+      .editOrNewMetadata()
+        .withDeletionTimestamp("1970-01-01T00:00:00Z")
+      .endMetadata()
+      .build()
+
+    val mockPodResource = mock(classOf[PodResource])
+    namedExecutorPods.put("spark-executor-1", mockPodResource)
+    when(mockPodResource.get()).thenReturn(failedPodWithDeletionTimestamp)
+
+    snapshotsStore.updatePod(failedPodWithDeletionTimestamp)
+    snapshotsStore.notifySubscribers()
+
+    // Verify executor is removed from Spark
+    val msg = "The executor with id 1 was deleted by a user or the framework."
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = false, msg)
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+
+    // Verify delete() is NOT called since deletionTimestamp is already set
+    verify(mockPodResource, never()).delete()
+  }
+
   private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): 
String = {
     val reason = Option(failedPod.getStatus.getReason)
     val message = Option(failedPod.getStatus.getMessage)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to