This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8ccd75  [SPARK-29905][K8S] Improve pod lifecycle manager behavior 
with dynamic allocation
b8ccd75 is described below

commit b8ccd755244d3cd8a81a9f4a1eafa2a4e48759d2
Author: Marcelo Vanzin <[email protected]>
AuthorDate: Thu Apr 16 14:15:10 2020 -0700

    [SPARK-29905][K8S] Improve pod lifecycle manager behavior with dynamic 
allocation
    
    This issue mainly shows up when you enable dynamic allocation:
    because there are many executor state changes (because of executors
    being requested and starting to run, and later stopped), the lifecycle
    manager class could end up logging information about the same executor
    multiple times, since the different events would cause the same
    executor update to be present in multiple pod snapshots. On top of that,
    it could end up making multiple redundant calls into the API server
    for the same pod.
    
    Another issue was when the config was set to not delete executor
    pods; with dynamic allocation, that means pods keep accumulating
    in the API server, and every time the full sync is done by the
    polling source, all executors, even the finished ones that Spark
    technically does not care about anymore, would be processed.
    
    The change modifies the lifecycle monitor so that it:
    
    - logs executor updates a single time, even if it shows up in
      multiple snapshots, by checking whether the state change
      happened before.
    - marks finished-but-not-deleted-in-k8s executors with a label
      so that they can be easily filtered out.
    
    This reduces the amount of logging done by the lifecycle manager,
    which is a minor thing in general since the logs are at debug level.
    But it also reduces the amount of data that needs to be fetched
    from the API server under certain configurations, and overall
    reduces interaction with the API server when dynamic allocation is on.
    
    There's also a change in the snapshot store to ensure that the
    same subscriber is not called concurrently. That is kind of a bug,
    since it means subscribers could be processing snapshots out of order,
    or even that they could block multiple threads (e.g. the allocator
    callback was synchronized). I actually ran into the "concurrent calls"
    situation in the lifecycle manager during testing, and while it did not
    seem to cause problems, it did make for some head scratching while
    looking at the logs. It seemed safer to fix that.
    
    Unit tests were updated to check for the changes. Also tested in real
    cluster with dynamic allocation on.
    
    Closes #26535 from vanzin/SPARK-29905.
    
    Lead-authored-by: Marcelo Vanzin <[email protected]>
    Co-authored-by: Marcelo Vanzin <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/deploy/k8s/Constants.scala    |   1 +
 .../cluster/k8s/ExecutorPodsAllocator.scala        |   2 +-
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 156 ++++++++++++++-------
 .../k8s/ExecutorPodsPollingSnapshotSource.scala    |   1 +
 .../k8s/ExecutorPodsSnapshotsStoreImpl.scala       |  87 +++++++++---
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala    |  12 +-
 .../ExecutorPodsPollingSnapshotSourceSuite.scala   |   8 +-
 7 files changed, 188 insertions(+), 79 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index a3c74ff7..759c205 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -24,6 +24,7 @@ private[spark] object Constants {
   val SPARK_ROLE_LABEL = "spark-role"
   val SPARK_POD_DRIVER_ROLE = "driver"
   val SPARK_POD_EXECUTOR_ROLE = "executor"
+  val SPARK_EXECUTOR_INACTIVE_LABEL = "spark-exec-inactive"
 
   // Credentials secrets
   val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index b394f35..b6ea1fa 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -94,7 +94,7 @@ private[spark] class ExecutorPodsAllocator(
 
   private def onNewSnapshots(
       applicationId: String,
-      snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
+      snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
     newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
     // For all executors we've created against the API but have not seen in a 
snapshot
     // yet - check the current time. If the current time has exceeded some 
threshold,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index d6b7582..5d91e52 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import com.google.common.cache.Cache
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorExited
@@ -41,11 +42,14 @@ private[spark] class ExecutorPodsLifecycleManager(
 
   import ExecutorPodsLifecycleManager._
 
-  private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
-
   private lazy val shouldDeleteExecutors = 
conf.get(KUBERNETES_DELETE_EXECUTORS)
 
+  // Keep track of which pods are inactive to avoid contacting the API server 
multiple times.
+  // This set is cleaned up when a snapshot containing the updated pod is 
processed.
+  private val inactivatedPods = mutable.HashSet.empty[Long]
+
   def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+    val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
     snapshotsStore.addSubscriber(eventProcessingInterval) {
       onNewSnapshots(schedulerBackend, _)
     }
@@ -58,56 +62,78 @@ private[spark] class ExecutorPodsLifecycleManager(
     snapshots.foreach { snapshot =>
       snapshot.executorPods.foreach { case (execId, state) =>
         state match {
+          case _state if isPodInactive(_state.pod) =>
+            inactivatedPods -= execId
+
           case deleted@PodDeleted(_) =>
-            logDebug(s"Snapshot reported deleted executor with id $execId," +
-              s" pod name ${state.pod.getMetadata.getName}")
-            removeExecutorFromSpark(schedulerBackend, deleted, execId)
-            execIdsRemovedInThisRound += execId
+            if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) {
+              execIdsRemovedInThisRound += execId
+              logDebug(s"Snapshot reported deleted executor with id $execId," +
+                s" pod name ${state.pod.getMetadata.getName}")
+            }
+            inactivatedPods -= execId
+
           case failed@PodFailed(_) =>
-            logDebug(s"Snapshot reported failed executor with id $execId," +
-              s" pod name ${state.pod.getMetadata.getName}")
-            onFinalNonDeletedState(failed, execId, schedulerBackend, 
execIdsRemovedInThisRound)
+            val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+            if (onFinalNonDeletedState(failed, execId, schedulerBackend, 
deleteFromK8s)) {
+              execIdsRemovedInThisRound += execId
+              logDebug(s"Snapshot reported failed executor with id $execId," +
+                s" pod name ${state.pod.getMetadata.getName}")
+            }
+
           case succeeded@PodSucceeded(_) =>
-            if (schedulerBackend.isExecutorActive(execId.toString)) {
-              logInfo(s"Snapshot reported succeeded executor with id $execId, 
" +
-                "even though the application has not requested for it to be 
removed.")
-            } else {
-              logDebug(s"Snapshot reported succeeded executor with id 
$execId," +
-                s" pod name ${state.pod.getMetadata.getName}.")
+            val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+            if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, 
deleteFromK8s)) {
+              execIdsRemovedInThisRound += execId
+              if (schedulerBackend.isExecutorActive(execId.toString)) {
+                logInfo(s"Snapshot reported succeeded executor with id 
$execId, " +
+                  "even though the application has not requested for it to be 
removed.")
+              } else {
+                logDebug(s"Snapshot reported succeeded executor with id 
$execId," +
+                  s" pod name ${state.pod.getMetadata.getName}.")
+              }
             }
-            onFinalNonDeletedState(succeeded, execId, schedulerBackend, 
execIdsRemovedInThisRound)
+
           case _ =>
         }
       }
     }
 
+    // Clean up any pods from the inactive list that don't match any pods from 
the last snapshot.
+    // This makes sure that we don't keep growing that set indefinitely, in 
case we end up missing
+    // an update for some pod.
+    if (inactivatedPods.nonEmpty && snapshots.nonEmpty) {
+      inactivatedPods.retain(snapshots.last.executorPods.contains(_))
+    }
+
     // Reconcile the case where Spark claims to know about an executor but the 
corresponding pod
     // is missing from the cluster. This would occur if we miss a deletion 
event and the pod
-    // transitions immediately from running io absent. We only need to check 
against the latest
+    // transitions immediately from running to absent. We only need to check 
against the latest
     // snapshot for this, and we don't do this for executors in the deleted 
executors cache or
     // that we just removed in this round.
-    if (snapshots.nonEmpty) {
-      val latestSnapshot = snapshots.last
-      (schedulerBackend.getExecutorIds().map(_.toLong).toSet
-        -- latestSnapshot.executorPods.keySet
-        -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
-        if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
-          val exitReasonMessage = s"The executor with ID $missingExecutorId 
was not found in the" +
-            s" cluster but we didn't get a reason why. Marking the executor as 
failed. The" +
-            s" executor may have been deleted but the driver missed the 
deletion event."
-          logDebug(exitReasonMessage)
-          val exitReason = ExecutorExited(
-            UNKNOWN_EXIT_CODE,
-            exitCausedByApp = false,
-            exitReasonMessage)
-          schedulerBackend.doRemoveExecutor(missingExecutorId.toString, 
exitReason)
-          execIdsRemovedInThisRound += missingExecutorId
-        }
+    val lostExecutors = if (snapshots.nonEmpty) {
+      schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+        snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound
+    } else {
+      Nil
+    }
+
+    lostExecutors.foreach { lostId =>
+      if (removedExecutorsCache.getIfPresent(lostId) == null) {
+        val exitReasonMessage = s"The executor with ID $lostId was not found 
in the" +
+          s" cluster but we didn't get a reason why. Marking the executor as 
failed. The" +
+          s" executor may have been deleted but the driver missed the deletion 
event."
+        logDebug(exitReasonMessage)
+        val exitReason = ExecutorExited(
+          UNKNOWN_EXIT_CODE,
+          exitCausedByApp = false,
+          exitReasonMessage)
+        schedulerBackend.doRemoveExecutor(lostId.toString, exitReason)
       }
     }
 
-    if (execIdsRemovedInThisRound.nonEmpty) {
-      logDebug(s"Removed executors with ids 
${execIdsRemovedInThisRound.mkString(",")}" +
+    if (lostExecutors.nonEmpty) {
+      logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" +
         s" from Spark that were either found to be deleted or non-existent in 
the cluster.")
     }
   }
@@ -116,35 +142,57 @@ private[spark] class ExecutorPodsLifecycleManager(
       podState: FinalPodState,
       execId: Long,
       schedulerBackend: KubernetesClusterSchedulerBackend,
-      execIdsRemovedInRound: mutable.Set[Long]): Unit = {
-    removeExecutorFromSpark(schedulerBackend, podState, execId)
-    if (shouldDeleteExecutors) {
-      removeExecutorFromK8s(podState.pod)
+      deleteFromK8s: Boolean): Boolean = {
+    val deleted = removeExecutorFromSpark(schedulerBackend, podState, execId)
+    if (deleteFromK8s) {
+      removeExecutorFromK8s(execId, podState.pod)
     }
-    execIdsRemovedInRound += execId
+    deleted
   }
 
-  private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
-    // If deletion failed on a previous try, we can try again if resync 
informs us the pod
-    // is still around.
-    // Delete as best attempt - duplicate deletes will throw an exception but 
the end state
-    // of getting rid of the pod is what matters.
+  private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
     Utils.tryLogNonFatalError {
-      kubernetesClient
-        .pods()
-        .withName(updatedPod.getMetadata.getName)
-        .delete()
+      if (shouldDeleteExecutors) {
+        // If deletion failed on a previous try, we can try again if resync 
informs us the pod
+        // is still around.
+        // Delete as best attempt - duplicate deletes will throw an exception 
but the end state
+        // of getting rid of the pod is what matters.
+        kubernetesClient
+          .pods()
+          .withName(updatedPod.getMetadata.getName)
+          .delete()
+      } else if (!inactivatedPods.contains(execId) && 
!isPodInactive(updatedPod)) {
+        // If the config is set to keep the executor  around, mark the pod as 
"inactive" so it
+        // can be ignored in future updates from the API server.
+        logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as 
inactive since " +
+          "deletion is disabled.")
+        val inactivatedPod = new PodBuilder(updatedPod)
+          .editMetadata()
+            .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
+            .endMetadata()
+          .build()
+
+        kubernetesClient
+          .pods()
+          .withName(updatedPod.getMetadata.getName)
+          .patch(inactivatedPod)
+
+        inactivatedPods += execId
+      }
     }
   }
 
   private def removeExecutorFromSpark(
       schedulerBackend: KubernetesClusterSchedulerBackend,
       podState: FinalPodState,
-      execId: Long): Unit = {
+      execId: Long): Boolean = {
     if (removedExecutorsCache.getIfPresent(execId) == null) {
       removedExecutorsCache.put(execId, execId)
       val exitReason = findExitReason(podState, execId)
       schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
+      true
+    } else {
+      false
     }
   }
 
@@ -181,6 +229,10 @@ private[spark] class ExecutorPodsLifecycleManager(
       terminatedContainer.getState.getTerminated.getExitCode.toInt
     }.getOrElse(UNKNOWN_EXIT_CODE)
   }
+
+  private def isPodInactive(pod: Pod): Boolean = {
+    pod.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL) == "true"
+  }
 }
 
 private object ExecutorPodsLifecycleManager {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
index 96a5059..fd8f697 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
@@ -59,6 +59,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
         .pods()
         .withLabel(SPARK_APP_ID_LABEL, applicationId)
         .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
         .list()
         .getItems
         .asScala)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 8aa20bf..d68dc3e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -16,14 +16,19 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
+import java.util.ArrayList
 import java.util.concurrent._
-
-import io.fabric8.kubernetes.api.model.Pod
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
 import javax.annotation.concurrent.GuardedBy
+
 import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model.Pod
 
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Controls the propagation of the Spark application's executor pods state to 
subscribers that
@@ -46,9 +51,11 @@ import org.apache.spark.util.{ThreadUtils, Utils}
  * subscriber's buffer. Subscribers receive blocks of snapshots produced by 
the producers in
  * time-windowed chunks. Each subscriber can choose to receive their snapshot 
chunks at different
  * time intervals.
+ * <br>
+ * The subcriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: 
ScheduledExecutorService)
-  extends ExecutorPodsSnapshotsStore {
+  extends ExecutorPodsSnapshotsStore with Logging {
 
   private val SNAPSHOT_LOCK = new Object()
 
@@ -61,14 +68,13 @@ private[spark] class 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
   override def addSubscriber(
       processBatchIntervalMillis: Long)
       (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
-    val newSubscriber = SnapshotsSubscriber(
-        new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+    val newSubscriber = new SnapshotsSubscriber(onNewSnapshots)
     SNAPSHOT_LOCK.synchronized {
-      newSubscriber.snapshotsBuffer.add(currentSnapshot)
+      newSubscriber.addCurrentSnapshot()
     }
     subscribers.add(newSubscriber)
     pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay(
-      () => callSubscriber(newSubscriber),
+      () => newSubscriber.processSnapshots(),
       0L,
       processBatchIntervalMillis,
       TimeUnit.MILLISECONDS))
@@ -77,7 +83,7 @@ private[spark] class 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
   override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized {
     subscribers.asScala.foreach { s =>
       subscribersExecutor.submit(new Runnable() {
-        override def run(): Unit = callSubscriber(s)
+        override def run(): Unit = s.processSnapshots()
       })
     }
   }
@@ -98,20 +104,57 @@ private[spark] class 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
   }
 
   private def addCurrentSnapshotToSubscribers(): Unit = {
-    subscribers.asScala.foreach { subscriber =>
-      subscriber.snapshotsBuffer.add(currentSnapshot)
-    }
+    subscribers.asScala.foreach(_.addCurrentSnapshot())
   }
 
-  private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
-    Utils.tryLogNonFatalError {
-      val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
-      subscriber.snapshotsBuffer.drainTo(currentSnapshots)
-      subscriber.onNewSnapshots(currentSnapshots.asScala)
+  private class SnapshotsSubscriber(onNewSnapshots: Seq[ExecutorPodsSnapshot] 
=> Unit) {
+
+    private val snapshotsBuffer = new 
LinkedBlockingQueue[ExecutorPodsSnapshot]()
+    private val lock = new ReentrantLock()
+    private val notificationCount = new AtomicInteger()
+
+    def addCurrentSnapshot(): Unit = {
+      snapshotsBuffer.add(currentSnapshot)
+    }
+
+    def processSnapshots(): Unit = {
+      notificationCount.incrementAndGet()
+      processSnapshotsInternal()
     }
-  }
 
-  private case class SnapshotsSubscriber(
-      snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
-      onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+    private def processSnapshotsInternal(): Unit = {
+      if (lock.tryLock()) {
+        // Check whether there are pending notifications before calling the 
subscriber. This
+        // is needed to avoid calling the subscriber spuriously when the race 
described in the
+        // comment below happens.
+        if (notificationCount.get() > 0) {
+          try {
+            val snapshots = new ArrayList[ExecutorPodsSnapshot]()
+            snapshotsBuffer.drainTo(snapshots)
+            onNewSnapshots(snapshots.asScala)
+          } catch {
+            case NonFatal(e) => logWarning("Exception when notifying snapshot 
subscriber.", e)
+          } finally {
+            lock.unlock()
+          }
+
+          if (notificationCount.decrementAndGet() > 0) {
+            // There was another concurrent request for this subcriber. 
Schedule a task to
+            // immediately process snapshots again, so that the subscriber can 
pick up any
+            // changes that may have happened between the time it started 
looking at snapshots
+            // above, and the time the concurrent request arrived.
+            //
+            // This has to be done outside of the lock, otherwise we might 
miss a notification
+            // arriving after the above check, but before we've released the 
lock. Flip side is
+            // that we may schedule a useless task that will just fail to grab 
the lock.
+            subscribersExecutor.submit(new Runnable() {
+              override def run(): Unit = processSnapshotsInternal()
+            })
+          }
+        } else {
+          lock.unlock()
+        }
+      }
+    }
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 9920f4d..fb6f3ac 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -20,7 +20,7 @@ import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{mock, never, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
@@ -30,6 +30,7 @@ import scala.collection.mutable
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config
+import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.scheduler.ExecutorExited
@@ -80,6 +81,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite 
with BeforeAndAfte
   test("Don't remove executors twice from Spark but remove from K8s 
repeatedly.") {
     val failedPod = failedExecutorWithoutDeletion(1)
     snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
     snapshotsStore.updatePod(failedPod)
     snapshotsStore.notifySubscribers()
     val msg = exitReasonMessage(1, failedPod)
@@ -108,7 +110,13 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     val msg = exitReasonMessage(1, failedPod)
     val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
-    verify(podOperations, never()).delete()
+    verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
+
+    val podCaptor = ArgumentCaptor.forClass(classOf[Pod])
+    
verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture())
+
+    val pod = podCaptor.getValue()
+    assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) 
=== "true")
   }
 
   private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String 
= {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
index 1b26d6a..63e43bd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
@@ -50,6 +50,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends 
SparkFunSuite with BeforeAn
   private var executorRoleLabeledPods: LABELED_PODS = _
 
   @Mock
+  private var activeExecutorPods: LABELED_PODS = _
+
+  @Mock
   private var eventQueue: ExecutorPodsSnapshotsStore = _
 
   private var pollingExecutor: DeterministicScheduler = _
@@ -69,10 +72,12 @@ class ExecutorPodsPollingSnapshotSourceSuite extends 
SparkFunSuite with BeforeAn
       .thenReturn(appIdLabeledPods)
     when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
       .thenReturn(executorRoleLabeledPods)
+    when(executorRoleLabeledPods.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, 
"true"))
+      .thenReturn(activeExecutorPods)
   }
 
   test("Items returned by the API should be pushed to the event queue") {
-    when(executorRoleLabeledPods.list())
+    when(activeExecutorPods.list())
       .thenReturn(new PodListBuilder()
         .addToItems(
           runningExecutor(1),
@@ -80,6 +85,5 @@ class ExecutorPodsPollingSnapshotSourceSuite extends 
SparkFunSuite with BeforeAn
         .build())
     pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
     verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), 
runningExecutor(2)))
-
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to