This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b8ccd75 [SPARK-29905][K8S] Improve pod lifecycle manager behavior
with dynamic allocation
b8ccd75 is described below
commit b8ccd755244d3cd8a81a9f4a1eafa2a4e48759d2
Author: Marcelo Vanzin <[email protected]>
AuthorDate: Thu Apr 16 14:15:10 2020 -0700
[SPARK-29905][K8S] Improve pod lifecycle manager behavior with dynamic
allocation
This issue mainly shows up when you enable dynamic allocation:
because there are many executor state changes (because of executors
being requested and starting to run, and later stopped), the lifecycle
manager class could end up logging information about the same executor
multiple times, since the different events would cause the same
executor update to be present in multiple pod snapshots. On top of that,
it could end up making multiple redundant calls into the API server
for the same pod.
Another issue was when the config was set to not delete executor
pods; with dynamic allocation, that means pods keep accumulating
in the API server, and every time the full sync is done by the
polling source, all executors, even the finished ones that Spark
technically does not care about anymore, would be processed.
The change modifies the lifecycle monitor so that it:
- logs executor updates a single time, even if it shows up in
multiple snapshots, by checking whether the state change
happened before.
- marks finished-but-not-deleted-in-k8s executors with a label
so that they can be easily filtered out.
This reduces the amount of logging done by the lifecycle manager,
which is a minor thing in general since the logs are at debug level.
But it also reduces the amount of data that needs to be fetched
from the API server under certain configurations, and overall
reduces interaction with the API server when dynamic allocation is on.
There's also a change in the snapshot store to ensure that the
same subscriber is not called concurrently. That is kind of a bug,
since it means subscribers could be processing snapshots out of order,
or even that they could block multiple threads (e.g. the allocator
callback was synchronized). I actually ran into the "concurrent calls"
situation in the lifecycle manager during testing, and while it did not
seem to cause problems, it did make for some head scratching while
looking at the logs. It seemed safer to fix that.
Unit tests were updated to check for the changes. Also tested in real
cluster with dynamic allocation on.
Closes #26535 from vanzin/SPARK-29905.
Lead-authored-by: Marcelo Vanzin <[email protected]>
Co-authored-by: Marcelo Vanzin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/k8s/Constants.scala | 1 +
.../cluster/k8s/ExecutorPodsAllocator.scala | 2 +-
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 156 ++++++++++++++-------
.../k8s/ExecutorPodsPollingSnapshotSource.scala | 1 +
.../k8s/ExecutorPodsSnapshotsStoreImpl.scala | 87 +++++++++---
.../k8s/ExecutorPodsLifecycleManagerSuite.scala | 12 +-
.../ExecutorPodsPollingSnapshotSourceSuite.scala | 8 +-
7 files changed, 188 insertions(+), 79 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index a3c74ff7..759c205 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -24,6 +24,7 @@ private[spark] object Constants {
val SPARK_ROLE_LABEL = "spark-role"
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"
+ val SPARK_EXECUTOR_INACTIVE_LABEL = "spark-exec-inactive"
// Credentials secrets
val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index b394f35..b6ea1fa 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -94,7 +94,7 @@ private[spark] class ExecutorPodsAllocator(
private def onNewSnapshots(
applicationId: String,
- snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
+ snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
// For all executors we've created against the API but have not seen in a
snapshot
// yet - check the current time. If the current time has exceeded some
threshold,
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index d6b7582..5d91e52 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,13 +17,14 @@
package org.apache.spark.scheduler.cluster.k8s
import com.google.common.cache.Cache
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
@@ -41,11 +42,14 @@ private[spark] class ExecutorPodsLifecycleManager(
import ExecutorPodsLifecycleManager._
- private val eventProcessingInterval =
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
-
private lazy val shouldDeleteExecutors =
conf.get(KUBERNETES_DELETE_EXECUTORS)
+ // Keep track of which pods are inactive to avoid contacting the API server
multiple times.
+ // This set is cleaned up when a snapshot containing the updated pod is
processed.
+ private val inactivatedPods = mutable.HashSet.empty[Long]
+
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+ val eventProcessingInterval =
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
onNewSnapshots(schedulerBackend, _)
}
@@ -58,56 +62,78 @@ private[spark] class ExecutorPodsLifecycleManager(
snapshots.foreach { snapshot =>
snapshot.executorPods.foreach { case (execId, state) =>
state match {
+ case _state if isPodInactive(_state.pod) =>
+ inactivatedPods -= execId
+
case deleted@PodDeleted(_) =>
- logDebug(s"Snapshot reported deleted executor with id $execId," +
- s" pod name ${state.pod.getMetadata.getName}")
- removeExecutorFromSpark(schedulerBackend, deleted, execId)
- execIdsRemovedInThisRound += execId
+ if (removeExecutorFromSpark(schedulerBackend, deleted, execId)) {
+ execIdsRemovedInThisRound += execId
+ logDebug(s"Snapshot reported deleted executor with id $execId," +
+ s" pod name ${state.pod.getMetadata.getName}")
+ }
+ inactivatedPods -= execId
+
case failed@PodFailed(_) =>
- logDebug(s"Snapshot reported failed executor with id $execId," +
- s" pod name ${state.pod.getMetadata.getName}")
- onFinalNonDeletedState(failed, execId, schedulerBackend,
execIdsRemovedInThisRound)
+ val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+ if (onFinalNonDeletedState(failed, execId, schedulerBackend,
deleteFromK8s)) {
+ execIdsRemovedInThisRound += execId
+ logDebug(s"Snapshot reported failed executor with id $execId," +
+ s" pod name ${state.pod.getMetadata.getName}")
+ }
+
case succeeded@PodSucceeded(_) =>
- if (schedulerBackend.isExecutorActive(execId.toString)) {
- logInfo(s"Snapshot reported succeeded executor with id $execId,
" +
- "even though the application has not requested for it to be
removed.")
- } else {
- logDebug(s"Snapshot reported succeeded executor with id
$execId," +
- s" pod name ${state.pod.getMetadata.getName}.")
+ val deleteFromK8s = !execIdsRemovedInThisRound.contains(execId)
+ if (onFinalNonDeletedState(succeeded, execId, schedulerBackend,
deleteFromK8s)) {
+ execIdsRemovedInThisRound += execId
+ if (schedulerBackend.isExecutorActive(execId.toString)) {
+ logInfo(s"Snapshot reported succeeded executor with id
$execId, " +
+ "even though the application has not requested for it to be
removed.")
+ } else {
+ logDebug(s"Snapshot reported succeeded executor with id
$execId," +
+ s" pod name ${state.pod.getMetadata.getName}.")
+ }
}
- onFinalNonDeletedState(succeeded, execId, schedulerBackend,
execIdsRemovedInThisRound)
+
case _ =>
}
}
}
+ // Clean up any pods from the inactive list that don't match any pods from
the last snapshot.
+ // This makes sure that we don't keep growing that set indefinitely, in
case we end up missing
+ // an update for some pod.
+ if (inactivatedPods.nonEmpty && snapshots.nonEmpty) {
+ inactivatedPods.retain(snapshots.last.executorPods.contains(_))
+ }
+
// Reconcile the case where Spark claims to know about an executor but the
corresponding pod
// is missing from the cluster. This would occur if we miss a deletion
event and the pod
- // transitions immediately from running io absent. We only need to check
against the latest
+ // transitions immediately from running to absent. We only need to check
against the latest
// snapshot for this, and we don't do this for executors in the deleted
executors cache or
// that we just removed in this round.
- if (snapshots.nonEmpty) {
- val latestSnapshot = snapshots.last
- (schedulerBackend.getExecutorIds().map(_.toLong).toSet
- -- latestSnapshot.executorPods.keySet
- -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
- if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
- val exitReasonMessage = s"The executor with ID $missingExecutorId
was not found in the" +
- s" cluster but we didn't get a reason why. Marking the executor as
failed. The" +
- s" executor may have been deleted but the driver missed the
deletion event."
- logDebug(exitReasonMessage)
- val exitReason = ExecutorExited(
- UNKNOWN_EXIT_CODE,
- exitCausedByApp = false,
- exitReasonMessage)
- schedulerBackend.doRemoveExecutor(missingExecutorId.toString,
exitReason)
- execIdsRemovedInThisRound += missingExecutorId
- }
+ val lostExecutors = if (snapshots.nonEmpty) {
+ schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+ snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound
+ } else {
+ Nil
+ }
+
+ lostExecutors.foreach { lostId =>
+ if (removedExecutorsCache.getIfPresent(lostId) == null) {
+ val exitReasonMessage = s"The executor with ID $lostId was not found
in the" +
+ s" cluster but we didn't get a reason why. Marking the executor as
failed. The" +
+ s" executor may have been deleted but the driver missed the deletion
event."
+ logDebug(exitReasonMessage)
+ val exitReason = ExecutorExited(
+ UNKNOWN_EXIT_CODE,
+ exitCausedByApp = false,
+ exitReasonMessage)
+ schedulerBackend.doRemoveExecutor(lostId.toString, exitReason)
}
}
- if (execIdsRemovedInThisRound.nonEmpty) {
- logDebug(s"Removed executors with ids
${execIdsRemovedInThisRound.mkString(",")}" +
+ if (lostExecutors.nonEmpty) {
+ logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" +
s" from Spark that were either found to be deleted or non-existent in
the cluster.")
}
}
@@ -116,35 +142,57 @@ private[spark] class ExecutorPodsLifecycleManager(
podState: FinalPodState,
execId: Long,
schedulerBackend: KubernetesClusterSchedulerBackend,
- execIdsRemovedInRound: mutable.Set[Long]): Unit = {
- removeExecutorFromSpark(schedulerBackend, podState, execId)
- if (shouldDeleteExecutors) {
- removeExecutorFromK8s(podState.pod)
+ deleteFromK8s: Boolean): Boolean = {
+ val deleted = removeExecutorFromSpark(schedulerBackend, podState, execId)
+ if (deleteFromK8s) {
+ removeExecutorFromK8s(execId, podState.pod)
}
- execIdsRemovedInRound += execId
+ deleted
}
- private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
- // If deletion failed on a previous try, we can try again if resync
informs us the pod
- // is still around.
- // Delete as best attempt - duplicate deletes will throw an exception but
the end state
- // of getting rid of the pod is what matters.
+ private def removeExecutorFromK8s(execId: Long, updatedPod: Pod): Unit = {
Utils.tryLogNonFatalError {
- kubernetesClient
- .pods()
- .withName(updatedPod.getMetadata.getName)
- .delete()
+ if (shouldDeleteExecutors) {
+ // If deletion failed on a previous try, we can try again if resync
informs us the pod
+ // is still around.
+ // Delete as best attempt - duplicate deletes will throw an exception
but the end state
+ // of getting rid of the pod is what matters.
+ kubernetesClient
+ .pods()
+ .withName(updatedPod.getMetadata.getName)
+ .delete()
+ } else if (!inactivatedPods.contains(execId) &&
!isPodInactive(updatedPod)) {
+ // If the config is set to keep the executor around, mark the pod as
"inactive" so it
+ // can be ignored in future updates from the API server.
+ logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as
inactive since " +
+ "deletion is disabled.")
+ val inactivatedPod = new PodBuilder(updatedPod)
+ .editMetadata()
+ .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
+ .endMetadata()
+ .build()
+
+ kubernetesClient
+ .pods()
+ .withName(updatedPod.getMetadata.getName)
+ .patch(inactivatedPod)
+
+ inactivatedPods += execId
+ }
}
}
private def removeExecutorFromSpark(
schedulerBackend: KubernetesClusterSchedulerBackend,
podState: FinalPodState,
- execId: Long): Unit = {
+ execId: Long): Boolean = {
if (removedExecutorsCache.getIfPresent(execId) == null) {
removedExecutorsCache.put(execId, execId)
val exitReason = findExitReason(podState, execId)
schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
+ true
+ } else {
+ false
}
}
@@ -181,6 +229,10 @@ private[spark] class ExecutorPodsLifecycleManager(
terminatedContainer.getState.getTerminated.getExitCode.toInt
}.getOrElse(UNKNOWN_EXIT_CODE)
}
+
+ private def isPodInactive(pod: Pod): Boolean = {
+ pod.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL) == "true"
+ }
}
private object ExecutorPodsLifecycleManager {
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
index 96a5059..fd8f697 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
@@ -59,6 +59,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list()
.getItems
.asScala)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 8aa20bf..d68dc3e 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -16,14 +16,19 @@
*/
package org.apache.spark.scheduler.cluster.k8s
+import java.util.ArrayList
import java.util.concurrent._
-
-import io.fabric8.kubernetes.api.model.Pod
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
+
import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model.Pod
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
/**
* Controls the propagation of the Spark application's executor pods state to
subscribers that
@@ -46,9 +51,11 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* subscriber's buffer. Subscribers receive blocks of snapshots produced by
the producers in
* time-windowed chunks. Each subscriber can choose to receive their snapshot
chunks at different
* time intervals.
+ * <br>
+ * The subcriber notification callback is guaranteed to be called from a
single thread at a time.
*/
private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor:
ScheduledExecutorService)
- extends ExecutorPodsSnapshotsStore {
+ extends ExecutorPodsSnapshotsStore with Logging {
private val SNAPSHOT_LOCK = new Object()
@@ -61,14 +68,13 @@ private[spark] class
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
override def addSubscriber(
processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
- val newSubscriber = SnapshotsSubscriber(
- new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+ val newSubscriber = new SnapshotsSubscriber(onNewSnapshots)
SNAPSHOT_LOCK.synchronized {
- newSubscriber.snapshotsBuffer.add(currentSnapshot)
+ newSubscriber.addCurrentSnapshot()
}
subscribers.add(newSubscriber)
pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay(
- () => callSubscriber(newSubscriber),
+ () => newSubscriber.processSnapshots(),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS))
@@ -77,7 +83,7 @@ private[spark] class
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized {
subscribers.asScala.foreach { s =>
subscribersExecutor.submit(new Runnable() {
- override def run(): Unit = callSubscriber(s)
+ override def run(): Unit = s.processSnapshots()
})
}
}
@@ -98,20 +104,57 @@ private[spark] class
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
}
private def addCurrentSnapshotToSubscribers(): Unit = {
- subscribers.asScala.foreach { subscriber =>
- subscriber.snapshotsBuffer.add(currentSnapshot)
- }
+ subscribers.asScala.foreach(_.addCurrentSnapshot())
}
- private def callSubscriber(subscriber: SnapshotsSubscriber): Unit = {
- Utils.tryLogNonFatalError {
- val currentSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot].asJava
- subscriber.snapshotsBuffer.drainTo(currentSnapshots)
- subscriber.onNewSnapshots(currentSnapshots.asScala)
+ private class SnapshotsSubscriber(onNewSnapshots: Seq[ExecutorPodsSnapshot]
=> Unit) {
+
+ private val snapshotsBuffer = new
LinkedBlockingQueue[ExecutorPodsSnapshot]()
+ private val lock = new ReentrantLock()
+ private val notificationCount = new AtomicInteger()
+
+ def addCurrentSnapshot(): Unit = {
+ snapshotsBuffer.add(currentSnapshot)
+ }
+
+ def processSnapshots(): Unit = {
+ notificationCount.incrementAndGet()
+ processSnapshotsInternal()
}
- }
- private case class SnapshotsSubscriber(
- snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
- onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
+ private def processSnapshotsInternal(): Unit = {
+ if (lock.tryLock()) {
+ // Check whether there are pending notifications before calling the
subscriber. This
+ // is needed to avoid calling the subscriber spuriously when the race
described in the
+ // comment below happens.
+ if (notificationCount.get() > 0) {
+ try {
+ val snapshots = new ArrayList[ExecutorPodsSnapshot]()
+ snapshotsBuffer.drainTo(snapshots)
+ onNewSnapshots(snapshots.asScala)
+ } catch {
+ case NonFatal(e) => logWarning("Exception when notifying snapshot
subscriber.", e)
+ } finally {
+ lock.unlock()
+ }
+
+ if (notificationCount.decrementAndGet() > 0) {
+ // There was another concurrent request for this subcriber.
Schedule a task to
+ // immediately process snapshots again, so that the subscriber can
pick up any
+ // changes that may have happened between the time it started
looking at snapshots
+ // above, and the time the concurrent request arrived.
+ //
+ // This has to be done outside of the lock, otherwise we might
miss a notification
+ // arriving after the above check, but before we've released the
lock. Flip side is
+ // that we may schedule a useless task that will just fail to grab
the lock.
+ subscribersExecutor.submit(new Runnable() {
+ override def run(): Unit = processSnapshotsInternal()
+ })
+ }
+ } else {
+ lock.unlock()
+ }
+ }
+ }
+ }
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 9920f4d..fb6f3ac 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -20,7 +20,7 @@ import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
@@ -30,6 +30,7 @@ import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
+import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.scheduler.ExecutorExited
@@ -80,6 +81,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite
with BeforeAndAfte
test("Don't remove executors twice from Spark but remove from K8s
repeatedly.") {
val failedPod = failedExecutorWithoutDeletion(1)
snapshotsStore.updatePod(failedPod)
+ snapshotsStore.notifySubscribers()
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
@@ -108,7 +110,13 @@ class ExecutorPodsLifecycleManagerSuite extends
SparkFunSuite with BeforeAndAfte
val msg = exitReasonMessage(1, failedPod)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
- verify(podOperations, never()).delete()
+ verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
+
+ val podCaptor = ArgumentCaptor.forClass(classOf[Pod])
+
verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture())
+
+ val pod = podCaptor.getValue()
+ assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL)
=== "true")
}
private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String
= {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
index 1b26d6a..63e43bd 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
@@ -50,6 +50,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends
SparkFunSuite with BeforeAn
private var executorRoleLabeledPods: LABELED_PODS = _
@Mock
+ private var activeExecutorPods: LABELED_PODS = _
+
+ @Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
private var pollingExecutor: DeterministicScheduler = _
@@ -69,10 +72,12 @@ class ExecutorPodsPollingSnapshotSourceSuite extends
SparkFunSuite with BeforeAn
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
+ when(executorRoleLabeledPods.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL,
"true"))
+ .thenReturn(activeExecutorPods)
}
test("Items returned by the API should be pushed to the event queue") {
- when(executorRoleLabeledPods.list())
+ when(activeExecutorPods.list())
.thenReturn(new PodListBuilder()
.addToItems(
runningExecutor(1),
@@ -80,6 +85,5 @@ class ExecutorPodsPollingSnapshotSourceSuite extends
SparkFunSuite with BeforeAn
.build())
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1),
runningExecutor(2)))
-
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]