This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3b368ca9715c [SPARK-54197][K8S] Improve
`ExecutorsPodsLifecycleManager` not to request to delete if `deletionTimestamp`
exists
3b368ca9715c is described below
commit 3b368ca9715c741556bcd38cbe662aeebb79c64d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Nov 5 17:54:27 2025 -0800
[SPARK-54197][K8S] Improve `ExecutorsPodsLifecycleManager` not to request
to delete if `deletionTimestamp` exists
### What changes were proposed in this pull request?
The current code handling deletion of Failed or Succeeded driver Pods is
calling the Kubernetes API to delete objects until either the Kubelet as
started the termination the Pod (the status of the object is terminating).
However, depending on configuration, the ExecutorPodsLifecycleManager loop
might run multiple times before the Kubelet starts the deletion of the Pod
object, resulting in un-necessary DELETE calls to the Kubernetes API, which are
particularly expensive since they are served from Etcd.
Following the Kubernetes API specifications in
https://kubernetes.io/docs/reference/using-api/api-concepts/
> When a client first sends a delete to request the removal of a resource,
the .metadata.deletionTimestamp is set to the current time. Once the
.metadata.deletionTimestamp is set, external controllers that act on finalizers
may start performing their cleanup work at any time, in any order.
we can assume that whenever the deletionTimestamp is set on a Pod, this
will be eventually terminated without the need of additional DELETE calls.
### Why are the changes needed?
This change is required to remove the need of redundant API calls agains
the Kubernetes API that at scale might lead to excessive load against Etcd.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This patch includes unit-tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52898
Closes #52902 from
dongjoon-hyun/driver-do-not-call-delete-for-terminating-pods-master.
Lead-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Andrea Tosatto <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 16 ++++++++++---
.../k8s/ExecutorPodsLifecycleManagerSuite.scala | 27 +++++++++++++++++++++-
2 files changed, 39 insertions(+), 4 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 9aa99a6c5984..2ad01229f8f2 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -201,8 +201,16 @@ private[spark] class ExecutorPodsLifecycleManager(
private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
Utils.tryLogNonFatalError {
if (shouldDeleteExecutors) {
- // Get pod before deleting it, we can skip deleting if pod is already
deleted so that
- // we do not send too many requests to api server.
+ if (updatedPod.getMetadata.getDeletionTimestamp != null) {
+ // Do not call the Kubernetes API if the deletion timestamp
+ // is already set on the updatedPod object.
+ // This is removing the need for un-necessary API roundtrips
+ // against the Kubernetes API.
+ return
+ }
+ // Get pod before deleting it, we can skip deleting if pod is already
deleted
+ // or has already the deletion timestamp set so that we do not send
+ // too many requests to apu server.
// If deletion failed on a previous try, we can try again if resync
informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception
but the end state
@@ -211,7 +219,9 @@ private[spark] class ExecutorPodsLifecycleManager(
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
- if (podToDelete.get() != null) {
+
+ if (podToDelete.get() != null &&
+ podToDelete.get.getMetadata.getDeletionTimestamp == null) {
podToDelete.delete()
}
} else if (!inactivatedPods.contains(execId) &&
!isPodInactive(updatedPod)) {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 4c7ffe692b10..7d81cc0ae16c 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -20,7 +20,7 @@ import java.util.function.UnaryOperator
import scala.collection.mutable
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
@@ -219,6 +219,31 @@ class ExecutorPodsLifecycleManagerSuite extends
SparkFunSuite with BeforeAndAfte
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}
+ test("Don't delete pod from K8s if deletionTimestamp is already set.") {
+ // Create a failed pod with deletionTimestamp already in the past
+ val basePod = failedExecutorWithoutDeletion(1)
+ val failedPodWithDeletionTimestamp = new PodBuilder(basePod)
+ .editOrNewMetadata()
+ .withDeletionTimestamp("1970-01-01T00:00:00Z")
+ .endMetadata()
+ .build()
+
+ val mockPodResource = mock(classOf[PodResource])
+ namedExecutorPods.put("spark-executor-1", mockPodResource)
+ when(mockPodResource.get()).thenReturn(failedPodWithDeletionTimestamp)
+
+ snapshotsStore.updatePod(failedPodWithDeletionTimestamp)
+ snapshotsStore.notifySubscribers()
+
+ // Verify executor is removed from Spark
+ val msg = "The executor with id 1 was deleted by a user or the framework."
+ val expectedLossReason = ExecutorExited(1, exitCausedByApp = false, msg)
+ verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+
+ // Verify delete() is NOT called since deletionTimestamp is already set
+ verify(mockPodResource, never()).delete()
+ }
+
private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int):
String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]