This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e33cbde59087 [SPARK-54198][K8S] Delete Kubernetes executor pods only 
once per event processing interval
e33cbde59087 is described below

commit e33cbde5908716b2099239f16e8d666fbcf36c12
Author: Peter Toth <[email protected]>
AuthorDate: Thu Nov 6 07:46:34 2025 -0800

    [SPARK-54198][K8S] Delete Kubernetes executor pods only once per event 
processing interval
    
    ### What changes were proposed in this pull request?
    
    When `ExecutorPodsLifecycleManager` processes the sequence of snapshots in 
`onNewSnapshots()` it maintains the `execIdsRemovedInThisRound` set of executor 
ids to not try deleting an executor pod multiple times.
    But this logic seems to have a flaw because it depends on 
`onFinalNonDeletedState()`, which depends on `removeExecutorFromSpark()`, which 
depends on if the executor has been added to `removedExecutorsCache`.
    
    Consider the following scenario:
    1. `onNewSnapshots()` runs and an executor is selected for deletion due to 
a `PodFailed` or `PodSucceeded` event.
      Because `removedExecutorsCache` (3 minutes cache) doesn't contain the 
executor `onFinalNonDeletedState()` returns with true and the executor is added 
to `execIdsRemovedInThisRound`.
      Before adding the executor to `execIdsRemovedInThisRound` 
`onFinalNonDeletedState()` calls `removeExecutorFromK8s()` to delete the 
Kubernetes pod.
      Due to the executor is in `execIdsRemovedInThisRound` the pod deletion is 
tried only once regardless how many snapshots we process in `onNewSnapshots()`.
    2. Let's suppose the pod deletion failed and 
`spark.kubernetes.executor.eventProcessingInterval` later (1s by default) 
`onNewSnapshots()` runs again.
      Because the executor is already in `removedExecutorsCache`, it is never 
added to `execIdsRemovedInThisRound`, which results in trying to delete the pod 
as many times as the number of snapshots we process in `onNewSnapshots()`.
    
    In our case the pod initial deletion failed due to flooding the kubernetes 
API so we issued more and more deletes...
    
    ### Why are the changes needed?
    
    Fix the above scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52899 from peter-toth/SPARK-54198-fix-multiple-pod-deletes.
    
    Authored-by: Peter Toth <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 2ec7439f51f310cff2339c43648100d07318e6ee)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala       |  8 +++-----
 .../cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala  | 16 ++++++++++++++++
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 2ad01229f8f2..3a508add6ccf 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -113,25 +113,23 @@ private[spark] class ExecutorPodsLifecycleManager(
             inactivatedPods -= execId
 
           case deleted@PodDeleted(_) =>
+            execIdsRemovedInThisRound += execId
             if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) {
-              execIdsRemovedInThisRound += execId
               logDebug(s"Snapshot reported deleted executor with id $execId," +
                 s" pod name ${state.pod.getMetadata.getName}")
             }
             inactivatedPods -= execId
 
           case failed@PodFailed(_) =>
-            val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+            val deleteFromK8s = execIdsRemovedInThisRound.add(execId)
             if (onFinalNonDeletedState(failed, execId, schedulerBackend, 
deleteFromK8s)) {
-              execIdsRemovedInThisRound += execId
               logDebug(s"Snapshot reported failed executor with id $execId," +
                 s" pod name ${state.pod.getMetadata.getName}")
             }
 
           case succeeded@PodSucceeded(_) =>
-            val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+            val deleteFromK8s = execIdsRemovedInThisRound.add(execId)
             if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, 
deleteFromK8s)) {
-              execIdsRemovedInThisRound += execId
               if (schedulerBackend.isExecutorActive(execId.toString)) {
                 logInfo(log"Snapshot reported succeeded executor with id " +
                   log"${MDC(LogKeys.EXECUTOR_ID, execId)}, even though the 
application has not " +
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 7d81cc0ae16c..cdbcae050ceb 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -244,6 +244,22 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     verify(mockPodResource, never()).delete()
   }
 
+  test("SPARK-54198: Delete Kubernetes executor pods only once per event 
processing interval") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    val mockPodResource = mock(classOf[PodResource])
+    namedExecutorPods.put("spark-executor-1", mockPodResource)
+    when(mockPodResource.get()).thenReturn(failedPod)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod, 1)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend, times(1)).doRemoveExecutor("1", 
expectedLossReason)
+    verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
+  }
+
   private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): 
String = {
     val reason = Option(failedPod.getStatus.getReason)
     val message = Option(failedPod.getStatus.getMessage)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to