This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b100e41 [SPARK-52581] Revise AppCleanUpStep to include cleanup logic
for all states
b100e41 is described below
commit b100e4164b994a11fd4eff6759a11e6495e10316
Author: Zhou JIANG <[email protected]>
AuthorDate: Sat Jun 28 11:17:21 2025 -0700
[SPARK-52581] Revise AppCleanUpStep to include cleanup logic for all states
### What changes were proposed in this pull request?
This PR updates AppCleanUpStep so that it can be added as a regular step of
each reconciliation, remove the therefore-unused AppTerminatedStep
### Why are the changes needed?
Previously, we have AppTerminatedStep performed at the beginning of each
reconciliation, to end the actions early if app has reached terminated state.
However, in some scenarios, we may still want to do "clean up" for
applications that are terminated:
* If an app in "terminated without releasing resources" is being deleted,
we would like the AppCleanUpStep to be performed, since it covers a few corner
cases that might not be covered in general delete flow
* We may also to introduce the concept of "maximal retention duration" for
applications along with current "retention policy" in near future to serve the
lifecycle management of application secondary resources. That could also
require cleaning up secondary resources for already-terminated applications.
The revised AppCleanUpStep includes everything in the previous
AppTerminatedStep and takes care of cleaning up resources for those in
"TerminatedWithoutReleaseResources" stage as needed.
### Does this PR introduce any user-facing change?
No - internal step(s) changes only
### How was this patch tested?
CIs and E2Es shall cover the clean up logic
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #257 from jiangzho/cleanupForCancel.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../operator/reconciler/SparkAppReconciler.java | 12 +-
.../reconciler/reconcilesteps/AppCleanUpStep.java | 115 ++++++--
.../reconcilesteps/AppTerminatedStep.java | 44 ---
.../reconcilesteps/AppCleanUpStepTest.java | 297 +++++++++++++++++++++
4 files changed, 386 insertions(+), 82 deletions(-)
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
index 9b1323b..c3b42e7 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
@@ -56,7 +56,6 @@ import
org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppInitStep;
import
org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppReconcileStep;
import
org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep;
-import
org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep;
import
org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep;
import org.apache.spark.k8s.operator.utils.LoggingUtils;
@@ -144,7 +143,7 @@ public class SparkAppReconciler implements
Reconciler<SparkApplication>, Cleaner
protected List<AppReconcileStep> getReconcileSteps(final SparkApplication
app) {
List<AppReconcileStep> steps = new ArrayList<>();
steps.add(new AppValidateStep());
- steps.add(new AppTerminatedStep());
+ steps.add(new AppCleanUpStep());
switch (app.getStatus().getCurrentState().getCurrentStateSummary()) {
case Submitted, ScheduledToRestart -> steps.add(new AppInitStep());
case DriverRequested, DriverStarted -> {
@@ -166,14 +165,6 @@ public class SparkAppReconciler implements
Reconciler<SparkApplication>, Cleaner
steps.add(
new AppResourceObserveStep(Collections.singletonList(new
AppDriverTimeoutObserver())));
}
- case DriverReadyTimedOut,
- DriverStartTimedOut,
- ExecutorsStartTimedOut,
- Succeeded,
- DriverEvicted,
- Failed,
- SchedulingFailure ->
- steps.add(new AppCleanUpStep());
default -> steps.add(new AppUnknownStateStep());
}
return steps;
@@ -196,7 +187,6 @@ public class SparkAppReconciler implements
Reconciler<SparkApplication>, Cleaner
SparkAppContext ctx = new SparkAppContext(sparkApplication, context,
submissionWorker);
List<AppReconcileStep> cleanupSteps = new ArrayList<>();
cleanupSteps.add(new AppValidateStep());
- cleanupSteps.add(new AppTerminatedStep());
cleanupSteps.add(new AppCleanUpStep(SparkAppStatusUtils::appCancelled));
for (AppReconcileStep step : cleanupSteps) {
ReconcileProgress progress = step.reconcile(ctx,
sparkAppStatusRecorder);
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
index 3f01134..f7b61bf 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
@@ -23,6 +23,8 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
import java.util.Optional;
import java.util.function.Supplier;
@@ -54,38 +56,78 @@ import
org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
@NoArgsConstructor
@Slf4j
public class AppCleanUpStep extends AppReconcileStep {
- private Supplier<ApplicationState> cleanUpSuccessStateSupplier;
+ private Supplier<ApplicationState> onDemandCleanUpReason;
+ /**
+ * Cleanup secondary resources for an application if needed and updates
application status
+ * accordingly. This step would be performed right after validation step in
each reconcile as a
+ * sanity check. It may end the reconciliation if no more actions are
needed. In addition, it can
+ * be performed on demand with a reason for cleanup secondary resources.
+ *
+ * <p>An app expects its secondary resources to be released if any of the
below is true:
+ *
+ * <ul>
+ * <li>When the application is being deleted on demand(e.g. being deleted)
with a reason
+ * <li>When the application is stopping
+ * <li>When the application has terminated without releasing resources,
but it has exceeded
+ * configured retention duration
+ * </ul>
+ *
+ * <p>It would proceed to next steps with no actions for application in
other states. Note that
+ * when even the reconciler decides to proceed with clean up, sub-resources
may still be retained
+ * based on tolerations.
+ *
+ * @param context context for the app
+ * @param statusRecorder recorder for status updates in this reconcile
+ * @return the reconcile progress
+ */
@Override
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
- ApplicationStatus currentStatus = context.getResource().getStatus();
- ApplicationTolerations tolerations =
- context.getResource().getSpec().getApplicationTolerations();
- ResourceRetainPolicy resourceRetainPolicy =
tolerations.getResourceRetainPolicy();
+ SparkApplication application = context.getResource();
+ ApplicationStatus currentStatus = application.getStatus();
+ ApplicationState currentState = currentStatus.getCurrentState();
+ ApplicationTolerations tolerations =
application.getSpec().getApplicationTolerations();
+ if
(ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary()))
{
+ statusRecorder.removeCachedStatus(application);
+ return ReconcileProgress.completeAndNoRequeue();
+ }
String stateMessage = null;
-
- if (retainReleaseResource(resourceRetainPolicy,
currentStatus.getCurrentState())) {
- if (tolerations.getRestartConfig() != null
- &&
!RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy()))
{
- stateMessage =
- "Application is configured to restart, resources created in
current "
- + "attempt would be force released.";
- log.warn(stateMessage);
+ if (isOnDemandCleanup()) {
+ log.info("Cleaning up application resources on demand");
+ } else {
+ if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
+ currentState.getCurrentStateSummary())) {
+ statusRecorder.removeCachedStatus(application);
+ return ReconcileProgress.completeAndNoRequeue();
+ } else if (currentState.getCurrentStateSummary().isStopping()) {
+ if (retainReleaseResourceForPolicyAndState(
+ tolerations.getResourceRetainPolicy(), currentState)) {
+ if (tolerations.getRestartConfig() != null
+ &&
!RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy()))
{
+ stateMessage =
+ "Application is configured to restart, resources created in
current "
+ + "attempt would be force released.";
+ log.warn(stateMessage);
+ } else {
+ ApplicationState terminationState =
+ new ApplicationState(
+ ApplicationStateSummary.TerminatedWithoutReleaseResources,
+ "Application is terminated without releasing resources as
configured.");
+ long requeueAfterMillis =
+
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
+ return appendStateAndRequeueAfter(
+ context, statusRecorder, terminationState,
Duration.ofMillis(requeueAfterMillis));
+ }
+ }
} else {
- ApplicationState terminationState =
- new ApplicationState(
- ApplicationStateSummary.TerminatedWithoutReleaseResources,
- "Application is terminated without releasing resources as
configured.");
- long requeueAfterMillis =
-
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
- return appendStateAndRequeueAfter(
- context, statusRecorder, terminationState,
Duration.ofMillis(requeueAfterMillis));
+ log.debug("Clean up is not expected for app, proceeding to next
step.");
+ return ReconcileProgress.proceed();
}
}
+
List<HasMetadata> resourcesToRemove = new ArrayList<>();
- if (ApplicationStateSummary.SchedulingFailure.equals(
- currentStatus.getCurrentState().getCurrentStateSummary())) {
+ if (isReleasingResourcesForSchedulingFailureAttempt(currentStatus)) {
// if app failed at scheduling, re-compute all spec and delete as they
may not be fully
// owned by driver
try {
@@ -110,13 +152,13 @@ public class AppCleanUpStep extends AppReconcileStep {
Optional<Pod> driver = context.getDriverPod();
driver.ifPresent(resourcesToRemove::add);
}
- boolean forceDelete = enableForceDelete(context.getResource());
+ boolean forceDelete = enableForceDelete(application);
for (HasMetadata resource : resourcesToRemove) {
ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource,
forceDelete);
}
ApplicationStatus updatedStatus;
- if (cleanUpSuccessStateSupplier != null) {
- ApplicationState state = cleanUpSuccessStateSupplier.get();
+ if (onDemandCleanUpReason != null) {
+ ApplicationState state = onDemandCleanUpReason.get();
if (StringUtils.isNotEmpty(stateMessage)) {
state.setMessage(stateMessage);
}
@@ -142,7 +184,26 @@ public class AppCleanUpStep extends AppReconcileStep {
}
}
- protected boolean retainReleaseResource(
+ protected boolean isOnDemandCleanup() {
+ return onDemandCleanUpReason != null;
+ }
+
+ protected boolean isReleasingResourcesForSchedulingFailureAttempt(
+ final ApplicationStatus status) {
+ ApplicationState lastObservedState = status.getCurrentState();
+ if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
+ lastObservedState.getCurrentStateSummary())) {
+ // if the app has already terminated, use the last observed state before
termination
+ NavigableMap<Long, ApplicationState> navMap =
+ (NavigableMap<Long, ApplicationState>)
status.getStateTransitionHistory();
+ Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
+ lastObservedState =
navMap.lowerEntry(terminateState.getKey()).getValue();
+ }
+ return ApplicationStateSummary.SchedulingFailure.equals(
+ lastObservedState.getCurrentStateSummary());
+ }
+
+ protected boolean retainReleaseResourceForPolicyAndState(
ResourceRetainPolicy resourceRetainPolicy, ApplicationState
currentState) {
return switch (resourceRetainPolicy) {
case Never -> false;
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java
deleted file mode 100644
index 16a4155..0000000
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
-
-import static
org.apache.spark.k8s.operator.reconciler.ReconcileProgress.proceed;
-
-import org.apache.spark.k8s.operator.context.SparkAppContext;
-import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
-import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
-
-/** Observes whether app is already terminated. If so, end the reconcile. */
-public class AppTerminatedStep extends AppReconcileStep {
- @Override
- public ReconcileProgress reconcile(
- SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
- if (context
- .getResource()
- .getStatus()
- .getCurrentState()
- .getCurrentStateSummary()
- .isTerminated()) {
- statusRecorder.removeCachedStatus(context.getResource());
- return ReconcileProgress.completeAndNoRequeue();
- }
- return proceed();
- }
-}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 388be41..1c4637e 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -19,14 +19,42 @@
package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.context.SparkAppContext;
+import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
+@SuppressWarnings("PMD.NcssCount")
class AppCleanUpStepTest {
@Test
void enableForceDelete() {
@@ -41,4 +69,273 @@ class AppCleanUpStepTest {
.setForceTerminationGracePeriodMillis(3000L);
assertTrue(appCleanUpStep.enableForceDelete(app));
}
+
+ @Test
+ void routineCleanupForRunningAppExpectNoAction() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep routineCheck = new AppCleanUpStep();
+ for (ApplicationStateSummary stateSummary :
ApplicationStateSummary.values()) {
+ if (!stateSummary.isStopping() && !stateSummary.isTerminated()) {
+ ApplicationStatus status = prepareApplicationStatus(stateSummary);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ SparkApplication mockApp = mock(SparkApplication.class);
+ when(mockApp.getStatus()).thenReturn(status);
+ when(mockApp.getSpec()).thenReturn(spec);
+ SparkAppContext mockAppContext = mock(SparkAppContext.class);
+ when(mockAppContext.getResource()).thenReturn(mockApp);
+ ReconcileProgress progress = routineCheck.reconcile(mockAppContext,
mockRecorder);
+ Assertions.assertEquals(ReconcileProgress.proceed(), progress);
+ verify(mockAppContext).getResource();
+ verify(mockApp).getSpec();
+ verify(mockApp).getStatus();
+ verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
+ }
+ }
+ }
+
+ @Test
+ void onDemandCleanupForRunningAppExpectDelete() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+ for (ApplicationStateSummary stateSummary :
ApplicationStateSummary.values()) {
+ if (!stateSummary.isStopping() && !stateSummary.isTerminated()) {
+ ApplicationStatus status = prepareApplicationStatus(stateSummary);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ SparkApplication mockApp = mock(SparkApplication.class);
+ when(mockApp.getStatus()).thenReturn(status);
+ when(mockApp.getSpec()).thenReturn(spec);
+ SparkAppContext mockAppContext = mock(SparkAppContext.class);
+ when(mockAppContext.getResource()).thenReturn(mockApp);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ when(mockAppContext.getClient()).thenReturn(mockClient);
+ Pod driverPod = mock(Pod.class);
+ when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
+
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+
+ try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
+ ReconcileProgress progress =
cleanUpWithReason.reconcile(mockAppContext, mockRecorder);
+ utils.verify(() ->
ReconcilerUtils.deleteResourceIfExists(mockClient, driverPod, false));
+ Assertions.assertEquals(
+
ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress);
+ }
+
+ verify(mockAppContext).getResource();
+ verify(mockApp, times(2)).getSpec();
+ verify(mockApp, times(2)).getStatus();
+ verify(mockAppContext).getClient();
+ verify(mockAppContext).getDriverPod();
+ ArgumentCaptor<ApplicationState> captor =
ArgumentCaptor.forClass(ApplicationState.class);
+ verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext),
captor.capture());
+ ApplicationState appState = captor.getValue();
+ Assertions.assertEquals(
+ ApplicationStateSummary.ResourceReleased,
appState.getCurrentStateSummary());
+ Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE,
appState.getMessage());
+ verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp,
mockClient, driverPod);
+ }
+ }
+ }
+
+ @Test
+ void routineCleanupForTerminatedAppExpectNoAction() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep routineCheck = new AppCleanUpStep();
+ for (ApplicationStateSummary stateSummary :
ApplicationStateSummary.values()) {
+ if (stateSummary.isTerminated()) {
+ ApplicationStatus status = prepareApplicationStatus(stateSummary);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ when(mockApp.getStatus()).thenReturn(status);
+ SparkAppContext mockAppContext = mock(SparkAppContext.class);
+ when(mockAppContext.getResource()).thenReturn(mockApp);
+ when(mockApp.getSpec()).thenReturn(spec);
+ ReconcileProgress progress = routineCheck.reconcile(mockAppContext,
mockRecorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(),
progress);
+ verify(mockAppContext).getResource();
+ verify(mockApp).getSpec();
+ verify(mockApp).getStatus();
+ verify(mockRecorder).removeCachedStatus(mockApp);
+ verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
+ }
+ }
+ }
+
+ @Test
+ void onDemandCleanupForTerminatedAppExpectNoAction() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep routineCheck = new AppCleanUpStep();
+ ApplicationStatus status =
prepareApplicationStatus(ApplicationStateSummary.ResourceReleased);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ when(mockApp.getStatus()).thenReturn(status);
+ SparkAppContext mockAppContext = mock(SparkAppContext.class);
+ when(mockAppContext.getResource()).thenReturn(mockApp);
+ when(mockApp.getSpec()).thenReturn(spec);
+ ReconcileProgress progress = routineCheck.reconcile(mockAppContext,
mockRecorder);
+ Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(),
progress);
+ verify(mockAppContext).getResource();
+ verify(mockApp).getSpec();
+ verify(mockApp).getStatus();
+ verify(mockRecorder).removeCachedStatus(mockApp);
+ verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
+ }
+
+ @Test
+ void onDemandCleanupForTerminatedAppExpectDelete() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+ ApplicationStatus status =
+
prepareApplicationStatus(ApplicationStateSummary.TerminatedWithoutReleaseResources);
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ when(mockApp.getStatus()).thenReturn(status);
+ SparkAppContext mockAppContext = mock(SparkAppContext.class);
+ when(mockAppContext.getResource()).thenReturn(mockApp);
+ when(mockApp.getSpec()).thenReturn(spec);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ when(mockAppContext.getClient()).thenReturn(mockClient);
+ Pod driverPod = mock(Pod.class);
+ when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
+
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+
+ try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
+ ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext,
mockRecorder);
+ utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient,
driverPod, false));
+ Assertions.assertEquals(
+ ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)),
progress);
+ }
+
+ verify(mockAppContext).getResource();
+ verify(mockApp, times(2)).getSpec();
+ verify(mockApp, times(2)).getStatus();
+ verify(mockAppContext).getClient();
+ verify(mockAppContext).getDriverPod();
+ ArgumentCaptor<ApplicationState> captor =
ArgumentCaptor.forClass(ApplicationState.class);
+ verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext),
captor.capture());
+ ApplicationState appState = captor.getValue();
+ Assertions.assertEquals(
+ ApplicationStateSummary.ResourceReleased,
appState.getCurrentStateSummary());
+ Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE,
appState.getMessage());
+ verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp,
mockClient, driverPod);
+ }
+
+ @Test
+ void cleanupForAppExpectDeleteWithRecompute() {
+ SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
+ AppCleanUpStep cleanUpWithReason = new
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+ ApplicationStatus status1 =
prepareApplicationStatus(ApplicationStateSummary.SchedulingFailure);
+ ApplicationStatus status2 =
+ prepareApplicationStatus(
+ ApplicationStateSummary.SchedulingFailure,
+ ApplicationStateSummary.TerminatedWithoutReleaseResources);
+ SparkApplication mockApp1 = mock(SparkApplication.class);
+ SparkApplication mockApp2 = mock(SparkApplication.class);
+ ApplicationSpec spec = ApplicationSpec.builder().build();
+ when(mockApp1.getStatus()).thenReturn(status1);
+ when(mockApp2.getStatus()).thenReturn(status2);
+ SparkAppContext mockAppContext1 = mock(SparkAppContext.class);
+ SparkAppContext mockAppContext2 = mock(SparkAppContext.class);
+ when(mockAppContext1.getResource()).thenReturn(mockApp1);
+ when(mockAppContext2.getResource()).thenReturn(mockApp2);
+ when(mockApp1.getSpec()).thenReturn(spec);
+ when(mockApp2.getSpec()).thenReturn(spec);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ when(mockAppContext1.getClient()).thenReturn(mockClient);
+ Pod driverPod = mock(Pod.class);
+ Pod driverPodSpec = mock(Pod.class);
+ ConfigMap resource1 = mock(ConfigMap.class);
+ ConfigMap resource2 = mock(ConfigMap.class);
+ when(mockAppContext1.getDriverPod()).thenReturn(Optional.of(driverPod));
+ when(mockAppContext1.getDriverPodSpec()).thenReturn(driverPodSpec);
+ when(mockAppContext1.getDriverPreResourcesSpec())
+ .thenReturn(Collections.singletonList(resource1));
+
when(mockAppContext1.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2));
+ when(mockAppContext2.getDriverPod()).thenReturn(Optional.of(driverPod));
+ when(mockAppContext2.getDriverPodSpec()).thenReturn(driverPodSpec);
+ when(mockAppContext2.getDriverPreResourcesSpec())
+ .thenReturn(Collections.singletonList(resource1));
+
when(mockAppContext2.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2));
+
+ try (MockedStatic<ReconcilerUtils> utils =
Mockito.mockStatic(ReconcilerUtils.class)) {
+ ReconcileProgress progress1 =
cleanUpWithReason.reconcile(mockAppContext1, mockRecorder);
+ ReconcileProgress progress2 =
cleanUpWithReason.reconcile(mockAppContext2, mockRecorder);
+ utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient,
resource1, false));
+ utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient,
driverPodSpec, false));
+ utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient,
resource2, false));
+ Assertions.assertEquals(
+ ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)),
progress1);
+ Assertions.assertEquals(
+ ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)),
progress2);
+ }
+
+ verify(mockAppContext1).getResource();
+ verify(mockApp1, times(2)).getSpec();
+ verify(mockApp1, times(2)).getStatus();
+ verify(mockAppContext1, times(3)).getClient();
+ verify(mockAppContext1).getDriverPreResourcesSpec();
+ verify(mockAppContext1).getDriverPodSpec();
+ verify(mockAppContext1).getDriverResourcesSpec();
+ verify(mockAppContext2).getResource();
+ verify(mockApp2, times(2)).getSpec();
+ verify(mockApp2, times(2)).getStatus();
+ verify(mockAppContext2, times(3)).getClient();
+ verify(mockAppContext2).getDriverPreResourcesSpec();
+ verify(mockAppContext2).getDriverPodSpec();
+ verify(mockAppContext2).getDriverResourcesSpec();
+ ArgumentCaptor<ApplicationState> captor =
ArgumentCaptor.forClass(ApplicationState.class);
+ verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext1),
captor.capture());
+ verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext2),
captor.capture());
+ Assertions.assertEquals(2, captor.getAllValues().size());
+ ApplicationState appState1 = captor.getAllValues().get(0);
+ Assertions.assertEquals(
+ ApplicationStateSummary.ResourceReleased,
appState1.getCurrentStateSummary());
+ Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE,
appState1.getMessage());
+ ApplicationState appState2 = captor.getAllValues().get(1);
+ Assertions.assertEquals(
+ ApplicationStateSummary.ResourceReleased,
appState2.getCurrentStateSummary());
+ Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE,
appState2.getMessage());
+ verifyNoMoreInteractions(
+ mockAppContext1, mockAppContext2, mockRecorder, mockApp1, mockApp2,
mockClient, driverPod);
+ }
+
+ private ApplicationStatus prepareApplicationStatus(ApplicationStateSummary
currentStateSummary) {
+ ApplicationStatus status = new ApplicationStatus();
+ ApplicationState state = new ApplicationState(currentStateSummary, "foo");
+ return status.appendNewState(state);
+ }
+
+ private ApplicationStatus prepareApplicationStatus(
+ ApplicationStateSummary currentStateSummary, ApplicationStateSummary
previousStateSummary) {
+ ApplicationStatus status = prepareApplicationStatus(previousStateSummary);
+ ApplicationState state = new ApplicationState(currentStateSummary, "foo");
+ return status.appendNewState(state);
+ }
+
+ @Test
+ void isReleasingResourcesForSchedulingFailureAttempt() {
+ AppCleanUpStep appCleanUpStep = new AppCleanUpStep();
+ ApplicationStatus status = new ApplicationStatus();
+
assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status =
+ status.appendNewState(new
ApplicationState(ApplicationStateSummary.DriverRequested, "foo"));
+
assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status =
+ status.appendNewState(new
ApplicationState(ApplicationStateSummary.RunningHealthy, "foo"));
+
assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status = status.appendNewState(new
ApplicationState(ApplicationStateSummary.Failed, "foo"));
+
assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status =
+ status.appendNewState(
+ new ApplicationState(ApplicationStateSummary.ScheduledToRestart,
"foo"));
+
assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status =
+ status.appendNewState(
+ new ApplicationState(ApplicationStateSummary.SchedulingFailure,
"foo"));
+
assertTrue(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ status =
+ status.appendNewState(
+ new
ApplicationState(ApplicationStateSummary.TerminatedWithoutReleaseResources,
"foo"));
+
assertTrue(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]