This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e33cbde59087 [SPARK-54198][K8S] Delete Kubernetes executor pods only
once per event processing interval
e33cbde59087 is described below
commit e33cbde5908716b2099239f16e8d666fbcf36c12
Author: Peter Toth <[email protected]>
AuthorDate: Thu Nov 6 07:46:34 2025 -0800
[SPARK-54198][K8S] Delete Kubernetes executor pods only once per event
processing interval
### What changes were proposed in this pull request?
When `ExecutorPodsLifecycleManager` processes the sequence of snapshots in
`onNewSnapshots()` it maintains the `execIdsRemovedInThisRound` set of executor
ids to not try deleting an executor pod multiple times.
But this logic seems to have a flaw because it depends on
`onFinalNonDeletedState()`, which depends on `removeExecutorFromSpark()`, which
depends on if the executor has been added to `removedExecutorsCache`.
Consider the following scenario:
1. `onNewSnapshots()` runs and an executor is selected for deletion due to
a `PodFailed` or `PodSucceeded` event.
Because `removedExecutorsCache` (3 minutes cache) doesn't contain the
executor `onFinalNonDeletedState()` returns with true and the executor is added
to `execIdsRemovedInThisRound`.
Before adding the executor to `execIdsRemovedInThisRound`
`onFinalNonDeletedState()` calls `removeExecutorFromK8s()` to delete the
Kubernetes pod.
Due to the executor is in `execIdsRemovedInThisRound` the pod deletion is
tried only once regardless how many snapshots we process in `onNewSnapshots()`.
2. Let's suppose the pod deletion failed and
`spark.kubernetes.executor.eventProcessingInterval` later (1s by default)
`onNewSnapshots()` runs again.
Because the executor is already in `removedExecutorsCache`, it is never
added to `execIdsRemovedInThisRound`, which results in trying to delete the pod
as many times as the number of snapshots we process in `onNewSnapshots()`.
In our case the pod initial deletion failed due to flooding the kubernetes
API so we issued more and more deletes...
### Why are the changes needed?
Fix the above scenario.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52899 from peter-toth/SPARK-54198-fix-multiple-pod-deletes.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2ec7439f51f310cff2339c43648100d07318e6ee)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 8 +++-----
.../cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala | 16 ++++++++++++++++
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 2ad01229f8f2..3a508add6ccf 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -113,25 +113,23 @@ private[spark] class ExecutorPodsLifecycleManager(
inactivatedPods -= execId
case deleted@PodDeleted(_) =>
+ execIdsRemovedInThisRound += execId
if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) {
- execIdsRemovedInThisRound += execId
logDebug(s"Snapshot reported deleted executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
}
inactivatedPods -= execId
case failed@PodFailed(_) =>
- val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+ val deleteFromK8s = execIdsRemovedInThisRound.add(execId)
if (onFinalNonDeletedState(failed, execId, schedulerBackend,
deleteFromK8s)) {
- execIdsRemovedInThisRound += execId
logDebug(s"Snapshot reported failed executor with id $execId," +
s" pod name ${state.pod.getMetadata.getName}")
}
case succeeded@PodSucceeded(_) =>
- val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+ val deleteFromK8s = execIdsRemovedInThisRound.add(execId)
if (onFinalNonDeletedState(succeeded, execId, schedulerBackend,
deleteFromK8s)) {
- execIdsRemovedInThisRound += execId
if (schedulerBackend.isExecutorActive(execId.toString)) {
logInfo(log"Snapshot reported succeeded executor with id " +
log"${MDC(LogKeys.EXECUTOR_ID, execId)}, even though the
application has not " +
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 7d81cc0ae16c..cdbcae050ceb 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -244,6 +244,22 @@ class ExecutorPodsLifecycleManagerSuite extends
SparkFunSuite with BeforeAndAfte
verify(mockPodResource, never()).delete()
}
+ test("SPARK-54198: Delete Kubernetes executor pods only once per event
processing interval") {
+ val failedPod = failedExecutorWithoutDeletion(1)
+ val mockPodResource = mock(classOf[PodResource])
+ namedExecutorPods.put("spark-executor-1", mockPodResource)
+ when(mockPodResource.get()).thenReturn(failedPod)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
+ val msg = exitReasonMessage(1, failedPod, 1)
+ val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+ verify(schedulerBackend, times(1)).doRemoveExecutor("1",
expectedLossReason)
+ verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
+ }
+
private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int):
String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]