This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 2895467 [SPARK-52648] Add support for maximal retain duration for Spark application resources 2895467 is described below commit 28954679f2e9ec352054aba09f8c4bde2f209ed2 Author: Zhou JIANG <jiang...@umich.edu> AuthorDate: Fri Jul 11 19:52:08 2025 -0700 [SPARK-52648] Add support for maximal retain duration for Spark application resources ### What changes were proposed in this pull request? This PR adds support for configuring the maximal retain duration for Spark apps. Working with the resourceRetainPolicy, it enhances the garbage collection mechanism. ### Why are the changes needed? Current resourceRetainPolicy provides flexibility for retain Spark app resources after its terminated. Introducing maximal retain duration would add one protection layer to avoid terminated resources (pods, config maps .etc) from taking quota in cluster. ### Does this PR introduce _any_ user-facing change? New configurable field `spec.applicationTolerations.resourceRetainDurationMillis` added to SparkApplication CRD ### How was this patch tested? CIs - including new unit test and e2e scenario ### Was this patch authored or co-authored using generative AI tooling? No Closes #268 from jiangzho/retentionDuration. Authored-by: Zhou JIANG <jiang...@umich.edu> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .github/workflows/build_and_test.yml | 3 + .../sparkapplications.spark.apache.org-v1.yaml | 3 + docs/spark_custom_resources.md | 7 +- .../org/apache/spark/k8s/operator/Constants.java | 2 + .../k8s/operator/spec/ApplicationTolerations.java | 41 ++++ .../reconciler/reconcilesteps/AppCleanUpStep.java | 113 +++++---- .../k8s/operator/utils/SparkAppStatusUtils.java | 5 + .../reconciler/SparkAppReconcilerTest.java | 57 +++++ .../reconcilesteps/AppCleanUpStepTest.java | 252 +++++++++++++++++++-- .../spark-state-transition-with-retain-check.yaml | 33 +++ .../resource-retain-duration/chainsaw-test.yaml | 56 +++++ .../spark-example-retain-duration.yaml | 34 +++ 12 files changed, 552 insertions(+), 54 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 15607ee..c52439d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -82,6 +82,7 @@ jobs: - spark-versions - python - state-transition + - resource-retain-duration - watched-namespaces exclude: - mode: dynamic @@ -90,6 +91,8 @@ jobs: test-group: python - mode: dynamic test-group: state-transition + - mode: dynamic + test-group: resource-retain-duration - mode: static test-group: watched-namespaces steps: diff --git a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml index 0df44fa..21d3996 100644 --- a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml +++ b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml @@ -17411,6 +17411,9 @@ spec: minExecutors: type: integer type: object + resourceRetainDurationMillis: + default: -1 + type: integer resourceRetainPolicy: enum: - Always diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md index 5378c64..ad6f8b9 100644 --- a/docs/spark_custom_resources.md +++ b/docs/spark_custom_resources.md @@ -289,12 +289,17 @@ On the other hand, when developing an application, it's possible to configure ```yaml applicationTolerations: # Acceptable values are 'Always', 'OnFailure', 'Never' + # Setting this to 'OnFailure' would retain secondary resources if and only if the app fails resourceRetainPolicy: OnFailure + # Secondary resources would be garbage collected 10 minutes after app termination + resourceRetainDurationMillis: 600000 ``` to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly, if resourceRetainPolicy is set to `Always`, operator would not delete driver resources -when app ends. Note that this applies only to operator-created resources (driver pod, SparkConf +when app ends. They would be by default kept with the same lifecycle as the App. It's also +possible to configure `resourceRetainDurationMillis` to define the maximal retain duration for +these resources. Note that this applies only to operator-created resources (driver pod, SparkConf configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination` and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created resources. diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java index 6815da3..3f7fff2 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java @@ -63,6 +63,8 @@ public class Constants { + "observed status for details."; public static final String APP_CANCELLED_MESSAGE = "Spark application has been shutdown as requested."; + public static final String APP_EXCEEDED_RETAIN_DURATION_MESSAGE = + "Spark application resources released after exceeding the configured retain duration."; public static final String DRIVER_UNEXPECTED_REMOVED_MESSAGE = "Driver removed. This could caused by 'exit' called in driver process with non-zero " + "code, involuntary disruptions or unintentional destroy behavior, check " diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java index 8e2bef0..8e47817 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java @@ -19,13 +19,18 @@ package org.apache.spark.k8s.operator.spec; +import java.time.Instant; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; +import io.fabric8.generator.annotation.Default; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.spark.k8s.operator.status.ApplicationState; + /** Toleration settings for a Spark application. */ @Data @NoArgsConstructor @@ -43,4 +48,40 @@ public class ApplicationTolerations { @Builder.Default protected ExecutorInstanceConfig instanceConfig = new ExecutorInstanceConfig(); @Builder.Default protected ResourceRetainPolicy resourceRetainPolicy = ResourceRetainPolicy.Never; + + /** + * Time-to-live in milliseconds for secondary resources of SparkApplications after termination. If + * set to a negative value, secondary resources could be retained with the same lifecycle as the + * application according to the retain policy. + */ + @Default("-1") + @Builder.Default + protected Long resourceRetainDurationMillis = -1L; + + /** + * Check whether a terminated application has exceeded the resource retain duration at the + * provided instant + * + * @param lastObservedState last observed state of the application + * @return true if the app has terminated and resource retain duration is configured to a positive + * value and the app is not within retain duration; false otherwise. + */ + public boolean exceedRetainDurationAtInstant( + ApplicationState lastObservedState, Instant instant) { + return lastObservedState != null + && lastObservedState.getCurrentStateSummary().isTerminated() + && resourceRetainDurationMillis > 0L + && Instant.parse(lastObservedState.getLastTransitionTime()) + .plusMillis(resourceRetainDurationMillis) + .isBefore(instant); + } + + /** + * Indicates whether the reconciler need to perform retain duration check + * + * @return true `resourceRetainDurationMillis` is set to non-negative value + */ + public boolean isRetainDurationEnabled() { + return resourceRetainDurationMillis >= 0L; + } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java index f7b61bf..c62a506 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java @@ -30,7 +30,6 @@ import java.util.function.Supplier; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; -import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -47,16 +46,22 @@ import org.apache.spark.k8s.operator.status.ApplicationStateSummary; import org.apache.spark.k8s.operator.status.ApplicationStatus; import org.apache.spark.k8s.operator.utils.ReconcilerUtils; import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; +import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils; /** * Cleanup all secondary resources when application is deleted, or at the end of each attempt. * Update Application status to indicate whether another attempt would be made. */ -@AllArgsConstructor @NoArgsConstructor @Slf4j public class AppCleanUpStep extends AppReconcileStep { private Supplier<ApplicationState> onDemandCleanUpReason; + private String stateUpdateMessage; + + public AppCleanUpStep(Supplier<ApplicationState> onDemandCleanUpReason) { + super(); + this.onDemandCleanUpReason = onDemandCleanUpReason; + } /** * Cleanup secondary resources for an application if needed and updates application status @@ -88,42 +93,37 @@ public class AppCleanUpStep extends AppReconcileStep { ApplicationStatus currentStatus = application.getStatus(); ApplicationState currentState = currentStatus.getCurrentState(); ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); - if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { - statusRecorder.removeCachedStatus(application); - return ReconcileProgress.completeAndNoRequeue(); - } - String stateMessage = null; - if (isOnDemandCleanup()) { - log.info("Cleaning up application resources on demand"); - } else { - if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( - currentState.getCurrentStateSummary())) { - statusRecorder.removeCachedStatus(application); - return ReconcileProgress.completeAndNoRequeue(); - } else if (currentState.getCurrentStateSummary().isStopping()) { - if (retainReleaseResourceForPolicyAndState( - tolerations.getResourceRetainPolicy(), currentState)) { - if (tolerations.getRestartConfig() != null - && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { - stateMessage = - "Application is configured to restart, resources created in current " - + "attempt would be force released."; - log.warn(stateMessage); - } else { - ApplicationState terminationState = - new ApplicationState( - ApplicationStateSummary.TerminatedWithoutReleaseResources, - "Application is terminated without releasing resources as configured."); - long requeueAfterMillis = - tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); - return appendStateAndRequeueAfter( - context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); - } + if (currentState.getCurrentStateSummary().isTerminated()) { + Optional<ReconcileProgress> terminatedAppProgress = + checkEarlyExitForTerminatedApp(application, statusRecorder); + if (terminatedAppProgress.isPresent()) { + return terminatedAppProgress.get(); + } + } else if (isOnDemandCleanup()) { + log.info("Releasing secondary resources for application on demand."); + } else if (currentState.getCurrentStateSummary().isStopping()) { + if (retainReleaseResourceForPolicyAndState( + tolerations.getResourceRetainPolicy(), currentState)) { + if (tolerations.getRestartConfig() != null + && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { + stateUpdateMessage = + "Application is configured to restart, resources created in current " + + "attempt would be force released."; + log.warn(stateUpdateMessage); + } else { + ApplicationState terminationState = + new ApplicationState( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + "Application is terminated without releasing resources as configured."); + long requeueAfterMillis = + tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); + return appendStateAndRequeueAfter( + context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); } - } else { - log.debug("Clean up is not expected for app, proceeding to next step."); - return ReconcileProgress.proceed(); } + } else { + log.debug("Clean up is not expected for app, proceeding to next step."); + return ReconcileProgress.proceed(); } List<HasMetadata> resourcesToRemove = new ArrayList<>(); @@ -159,8 +159,8 @@ public class AppCleanUpStep extends AppReconcileStep { ApplicationStatus updatedStatus; if (onDemandCleanUpReason != null) { ApplicationState state = onDemandCleanUpReason.get(); - if (StringUtils.isNotEmpty(stateMessage)) { - state.setMessage(stateMessage); + if (StringUtils.isNotEmpty(stateUpdateMessage)) { + state.setMessage(stateUpdateMessage); } long requeueAfterMillis = tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); @@ -171,7 +171,7 @@ public class AppCleanUpStep extends AppReconcileStep { currentStatus.terminateOrRestart( tolerations.getRestartConfig(), tolerations.getResourceRetainPolicy(), - stateMessage, + stateUpdateMessage, SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue()); long requeueAfterMillis = tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); @@ -184,6 +184,41 @@ public class AppCleanUpStep extends AppReconcileStep { } } + protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp( + final SparkApplication application, final SparkAppStatusRecorder statusRecorder) { + ApplicationStatus currentStatus = application.getStatus(); + ApplicationState currentState = currentStatus.getCurrentState(); + ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); + if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { + statusRecorder.removeCachedStatus(application); + return Optional.of(ReconcileProgress.completeAndNoRequeue()); + } + if (isOnDemandCleanup()) { + return Optional.empty(); + } + if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( + currentState.getCurrentStateSummary())) { + if (tolerations.isRetainDurationEnabled()) { + Instant now = Instant.now(); + if (tolerations.exceedRetainDurationAtInstant(currentState, now)) { + onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration; + return Optional.empty(); + } else { + Duration nextCheckDuration = + Duration.between( + Instant.now(), + Instant.parse(currentState.getLastTransitionTime()) + .plusMillis(tolerations.getResourceRetainDurationMillis())); + return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration)); + } + } else { + statusRecorder.removeCachedStatus(application); + return Optional.of(ReconcileProgress.completeAndNoRequeue()); + } + } + return Optional.empty(); + } + protected boolean isOnDemandCleanup() { return onDemandCleanUpReason != null; } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java index ecbe3e2..8b763bf 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java @@ -61,6 +61,11 @@ public final class SparkAppStatusUtils { ApplicationStateSummary.ResourceReleased, Constants.APP_CANCELLED_MESSAGE); } + public static ApplicationState appExceededRetainDuration() { + return new ApplicationState( + ApplicationStateSummary.ResourceReleased, Constants.APP_EXCEEDED_RETAIN_DURATION_MESSAGE); + } + public static boolean hasReachedState( SparkApplication application, ApplicationState stateToCheck) { return isValidApplicationStatus(application) diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java index d589765..ff7f371 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java @@ -116,4 +116,61 @@ class SparkAppReconcilerTest { assertTrue(deleteControl.isRemoveFinalizer()); } } + + @SuppressWarnings("PMD.UnusedLocalVariable") + @Test + void testCleanupAppTerminatedWithoutReleaseResources() { + try (MockedConstruction<SparkAppContext> mockAppContext = + mockConstruction( + SparkAppContext.class, + (mock, context) -> { + when(mock.getResource()).thenReturn(app); + when(mock.getClient()).thenReturn(mockClient); + when(mock.getDriverPod()).thenReturn(Optional.of(mockDriver)); + when(mock.getDriverPodSpec()).thenReturn(mockDriver); + when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + }); + MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) { + // delete app + app.setStatus( + app.getStatus() + .appendNewState( + new ApplicationState( + ApplicationStateSummary.TerminatedWithoutReleaseResources, ""))); + DeleteControl deleteControl = reconciler.cleanup(app, mockContext); + assertFalse(deleteControl.isRemoveFinalizer()); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, mockDriver, false)); + assertEquals( + ApplicationStateSummary.ResourceReleased, + app.getStatus().getCurrentState().getCurrentStateSummary()); + + // proceed delete for terminated app + deleteControl = reconciler.cleanup(app, mockContext); + assertTrue(deleteControl.isRemoveFinalizer()); + } + } + + @SuppressWarnings("PMD.UnusedLocalVariable") + @Test + void testCleanupAppTerminatedResourceReleased() { + try (MockedConstruction<SparkAppContext> mockAppContext = + mockConstruction( + SparkAppContext.class, + (mock, context) -> { + when(mock.getResource()).thenReturn(app); + when(mock.getClient()).thenReturn(mockClient); + when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + }); + MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) { + // delete app + app.setStatus( + app.getStatus() + .appendNewState(new ApplicationState(ApplicationStateSummary.ResourceReleased, ""))); + DeleteControl deleteControl = reconciler.cleanup(app, mockContext); + assertTrue(deleteControl.isRemoveFinalizer()); + utils.verifyNoInteractions(); + } + } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java index 1c4637e..23ed54b 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Optional; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -47,6 +48,8 @@ import org.apache.spark.k8s.operator.SparkApplication; import org.apache.spark.k8s.operator.context.SparkAppContext; import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.spec.ApplicationTolerations; +import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy; import org.apache.spark.k8s.operator.status.ApplicationState; import org.apache.spark.k8s.operator.status.ApplicationStateSummary; import org.apache.spark.k8s.operator.status.ApplicationStatus; @@ -56,6 +59,40 @@ import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils; @SuppressWarnings("PMD.NcssCount") class AppCleanUpStepTest { + private final ApplicationSpec alwaysRetain = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .build()) + .build(); + private final ApplicationSpec neverRetain = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Never) + .build()) + .build(); + private final ApplicationSpec exceedRetainDuration = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .resourceRetainDurationMillis(1L) + .build()) + .build(); + private final ApplicationSpec notExceedRetainDuration = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .resourceRetainDurationMillis(24 * 60 * 60 * 1000L) + .build()) + .build(); + + private final List<ApplicationSpec> specs = + List.of(alwaysRetain, neverRetain, exceedRetainDuration, notExceedRetainDuration); + @Test void enableForceDelete() { AppCleanUpStep appCleanUpStep = new AppCleanUpStep(); @@ -120,7 +157,7 @@ class AppCleanUpStepTest { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); } - verify(mockAppContext).getResource(); + verify(mockAppContext, times(1)).getResource(); verify(mockApp, times(2)).getSpec(); verify(mockApp, times(2)).getStatus(); verify(mockAppContext).getClient(); @@ -151,9 +188,9 @@ class AppCleanUpStepTest { when(mockApp.getSpec()).thenReturn(spec); ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); - verify(mockAppContext).getResource(); - verify(mockApp).getSpec(); - verify(mockApp).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); verify(mockRecorder).removeCachedStatus(mockApp); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -163,7 +200,7 @@ class AppCleanUpStepTest { @Test void onDemandCleanupForTerminatedAppExpectNoAction() { SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); - AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); ApplicationStatus status = prepareApplicationStatus(ApplicationStateSummary.ResourceReleased); SparkApplication mockApp = mock(SparkApplication.class); ApplicationSpec spec = ApplicationSpec.builder().build(); @@ -171,11 +208,11 @@ class AppCleanUpStepTest { SparkAppContext mockAppContext = mock(SparkAppContext.class); when(mockAppContext.getResource()).thenReturn(mockApp); when(mockApp.getSpec()).thenReturn(spec); - ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); + ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder); Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); - verify(mockAppContext).getResource(); - verify(mockApp).getSpec(); - verify(mockApp).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); verify(mockRecorder).removeCachedStatus(mockApp); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -206,9 +243,9 @@ class AppCleanUpStepTest { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); } - verify(mockAppContext).getResource(); - verify(mockApp, times(2)).getSpec(); - verify(mockApp, times(2)).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(3)).getSpec(); + verify(mockApp, times(3)).getStatus(); verify(mockAppContext).getClient(); verify(mockAppContext).getDriverPod(); ArgumentCaptor<ApplicationState> captor = ArgumentCaptor.forClass(ApplicationState.class); @@ -269,14 +306,14 @@ class AppCleanUpStepTest { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress2); } - verify(mockAppContext1).getResource(); + verify(mockAppContext1, times(1)).getResource(); verify(mockApp1, times(2)).getSpec(); verify(mockApp1, times(2)).getStatus(); verify(mockAppContext1, times(3)).getClient(); verify(mockAppContext1).getDriverPreResourcesSpec(); verify(mockAppContext1).getDriverPodSpec(); verify(mockAppContext1).getDriverResourcesSpec(); - verify(mockAppContext2).getResource(); + verify(mockAppContext2, times(1)).getResource(); verify(mockApp2, times(2)).getSpec(); verify(mockApp2, times(2)).getStatus(); verify(mockAppContext2, times(3)).getClient(); @@ -299,9 +336,195 @@ class AppCleanUpStepTest { mockAppContext1, mockAppContext2, mockRecorder, mockApp1, mockApp2, mockClient, driverPod); } + @Test + void checkEarlyExitForResourceReleasedApp() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.RunningHealthy); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationSpec appSpec : specs) { + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(appSpec); + + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + Assertions.assertEquals( + ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder1).removeCachedStatus(mockApp); + + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertTrue(onDemandProgress.isPresent()); + Assertions.assertEquals( + ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder2).removeCachedStatus(mockApp); + } + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesInfiniteRetain() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(alwaysRetain); + + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder1).removeCachedStatus(mockApp); + + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesExceededRetainDuration() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(exceedRetainDuration); + + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertFalse(routineCheckProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder1); + + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesWithinRetainDuration() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(notExceedRetainDuration); + + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + ReconcileProgress reconcileProgress = routineCheckProgress.get(); + assertTrue(reconcileProgress.isCompleted()); + assertTrue(reconcileProgress.isRequeue()); + verifyNoMoreInteractions(mockRecorder2); + + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForNotTerminatedApp() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + for (ApplicationStateSummary stateSummary : ApplicationStateSummary.values()) { + if (stateSummary.isTerminated()) { + continue; + } + ApplicationStatus status = prepareApplicationStatus(stateSummary); + for (ApplicationSpec appSpec : specs) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(status); + when(mockApp.getSpec()).thenReturn(appSpec); + + Optional<ReconcileProgress> routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isEmpty()); + verifyNoMoreInteractions(mockRecorder1); + + Optional<ReconcileProgress> onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertTrue(onDemandProgress.isEmpty()); + verifyNoMoreInteractions(mockRecorder2); + } + } + } + private ApplicationStatus prepareApplicationStatus(ApplicationStateSummary currentStateSummary) { ApplicationStatus status = new ApplicationStatus(); ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + // to make sure the state exceeds threshold + state.setLastTransitionTime(Instant.now().minusSeconds(10).toString()); return status.appendNewState(state); } @@ -309,6 +532,7 @@ class AppCleanUpStepTest { ApplicationStateSummary currentStateSummary, ApplicationStateSummary previousStateSummary) { ApplicationStatus status = prepareApplicationStatus(previousStateSummary); ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + state.setLastTransitionTime(Instant.now().minusSeconds(5).toString()); return status.appendNewState(state); } diff --git a/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml new file mode 100644 index 0000000..4460291 --- /dev/null +++ b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: spark.apache.org/v1 +kind: SparkApplication +metadata: + name: spark-example-retain-duration + namespace: ($SPARK_APP_NAMESPACE) +status: + stateTransitionHistory: + (*.currentStateSummary): + - "Submitted" + - "DriverRequested" + - "DriverStarted" + - "DriverReady" + - "RunningHealthy" + - "Succeeded" + - "TerminatedWithoutReleaseResources" + - "ResourceReleased" diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml b/tests/e2e/resource-retain-duration/chainsaw-test.yaml new file mode 100644 index 0000000..2c8b558 --- /dev/null +++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: garbage-collect-with-retain-duration-test +spec: + scenarios: + - bindings: + - name: TEST_NAME + value: succeeded + - name: APPLICATION_FILE_NAME + value: spark-example-retain-duration.yaml + - name: SPARK_APPLICATION_NAME + value: spark-example-retain-duration + steps: + - try: + - script: + env: + - name: FILE_NAME + value: ($APPLICATION_FILE_NAME) + content: kubectl apply -f $FILE_NAME + - assert: + bindings: + - name: SPARK_APP_NAMESPACE + value: default + timeout: 120s + file: "../assertions/spark-application/spark-state-transition-with-retain-check.yaml" + catch: + - describe: + apiVersion: spark.apache.org/v1 + kind: SparkApplication + namespace: default + finally: + - script: + env: + - name: SPARK_APPLICATION_NAME + value: ($SPARK_APPLICATION_NAME) + timeout: 120s + content: | + kubectl delete sparkapplication $SPARK_APPLICATION_NAME diff --git a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml new file mode 100644 index 0000000..022fdd4 --- /dev/null +++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: spark.apache.org/v1 +kind: SparkApplication +metadata: + name: spark-example-retain-duration + namespace: default +spec: + mainClass: "org.apache.spark.examples.SparkPi" + jars: "local:///opt/spark/examples/jars/spark-examples.jar" + applicationTolerations: + resourceRetainPolicy: Always + resourceRetainDurationMillis: 10000 + sparkConf: + spark.executor.instances: "1" + spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala" + spark.kubernetes.authenticate.driver.serviceAccountName: "spark" + runtimeVersions: + sparkVersion: 4.0.0 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org