This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 2895467  [SPARK-52648] Add support for maximal retain duration for 
Spark application resources
2895467 is described below

commit 28954679f2e9ec352054aba09f8c4bde2f209ed2
Author: Zhou JIANG <jiang...@umich.edu>
AuthorDate: Fri Jul 11 19:52:08 2025 -0700

    [SPARK-52648] Add support for maximal retain duration for Spark application 
resources
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for configuring the maximal retain duration for Spark 
apps. Working with the resourceRetainPolicy, it enhances the garbage collection 
mechanism.
    
    ### Why are the changes needed?
    
    Current resourceRetainPolicy provides flexibility for retain Spark app 
resources after its terminated. Introducing maximal retain duration would add 
one protection layer to avoid terminated resources (pods, config maps .etc) 
from taking quota in cluster.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New configurable field 
`spec.applicationTolerations.resourceRetainDurationMillis` added to 
SparkApplication CRD
    
    ### How was this patch tested?
    
    CIs - including new unit test and e2e scenario
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #268 from jiangzho/retentionDuration.
    
    Authored-by: Zhou JIANG <jiang...@umich.edu>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .github/workflows/build_and_test.yml               |   3 +
 .../sparkapplications.spark.apache.org-v1.yaml     |   3 +
 docs/spark_custom_resources.md                     |   7 +-
 .../org/apache/spark/k8s/operator/Constants.java   |   2 +
 .../k8s/operator/spec/ApplicationTolerations.java  |  41 ++++
 .../reconciler/reconcilesteps/AppCleanUpStep.java  | 113 +++++----
 .../k8s/operator/utils/SparkAppStatusUtils.java    |   5 +
 .../reconciler/SparkAppReconcilerTest.java         |  57 +++++
 .../reconcilesteps/AppCleanUpStepTest.java         | 252 +++++++++++++++++++--
 .../spark-state-transition-with-retain-check.yaml  |  33 +++
 .../resource-retain-duration/chainsaw-test.yaml    |  56 +++++
 .../spark-example-retain-duration.yaml             |  34 +++
 12 files changed, 552 insertions(+), 54 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 15607ee..c52439d 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -82,6 +82,7 @@ jobs:
           - spark-versions
           - python
           - state-transition
+          - resource-retain-duration
           - watched-namespaces
         exclude:
           - mode: dynamic
@@ -90,6 +91,8 @@ jobs:
             test-group: python
           - mode: dynamic
             test-group: state-transition
+          - mode: dynamic
+            test-group: resource-retain-duration
           - mode: static
             test-group: watched-namespaces
     steps:
diff --git 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 0df44fa..21d3996 100644
--- 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++ 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -17411,6 +17411,9 @@ spec:
                         minExecutors:
                           type: integer
                       type: object
+                    resourceRetainDurationMillis:
+                      default: -1
+                      type: integer
                     resourceRetainPolicy:
                       enum:
                         - Always
diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md
index 5378c64..ad6f8b9 100644
--- a/docs/spark_custom_resources.md
+++ b/docs/spark_custom_resources.md
@@ -289,12 +289,17 @@ On the other hand, when developing an application, it's 
possible to configure
 ```yaml
 applicationTolerations:
   # Acceptable values are 'Always', 'OnFailure', 'Never'
+  # Setting this to 'OnFailure' would retain secondary resources if and only 
if the app fails
   resourceRetainPolicy: OnFailure
+  # Secondary resources would be garbage collected 10 minutes after app 
termination 
+  resourceRetainDurationMillis: 600000
 ```
 
 to avoid operator attempt to delete driver pod and driver resources if app 
fails. Similarly,
 if resourceRetainPolicy is set to `Always`, operator would not delete driver 
resources
-when app ends. Note that this applies only to operator-created resources 
(driver pod, SparkConf
+when app ends. They would be by default kept with the same lifecycle as the 
App. It's also
+possible to configure `resourceRetainDurationMillis` to define the maximal 
retain duration for
+these resources. Note that this applies only to operator-created resources 
(driver pod, SparkConf
 configmap .etc). You may also want to tune 
`spark.kubernetes.driver.service.deleteOnTermination`
 and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of 
driver-created
 resources.
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 6815da3..3f7fff2 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -63,6 +63,8 @@ public class Constants {
           + "observed status for details.";
   public static final String APP_CANCELLED_MESSAGE =
       "Spark application has been shutdown as requested.";
+  public static final String APP_EXCEEDED_RETAIN_DURATION_MESSAGE =
+      "Spark application resources released after exceeding the configured 
retain duration.";
   public static final String DRIVER_UNEXPECTED_REMOVED_MESSAGE =
       "Driver removed. This could caused by 'exit' called in driver process 
with non-zero "
           + "code, involuntary disruptions or unintentional destroy behavior, 
check "
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
index 8e2bef0..8e47817 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java
@@ -19,13 +19,18 @@
 
 package org.apache.spark.k8s.operator.spec;
 
+import java.time.Instant;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
+import io.fabric8.generator.annotation.Default;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import org.apache.spark.k8s.operator.status.ApplicationState;
+
 /** Toleration settings for a Spark application. */
 @Data
 @NoArgsConstructor
@@ -43,4 +48,40 @@ public class ApplicationTolerations {
   @Builder.Default protected ExecutorInstanceConfig instanceConfig = new 
ExecutorInstanceConfig();
 
   @Builder.Default protected ResourceRetainPolicy resourceRetainPolicy = 
ResourceRetainPolicy.Never;
+
+  /**
+   * Time-to-live in milliseconds for secondary resources of SparkApplications 
after termination. If
+   * set to a negative value, secondary resources could be retained with the 
same lifecycle as the
+   * application according to the retain policy.
+   */
+  @Default("-1")
+  @Builder.Default
+  protected Long resourceRetainDurationMillis = -1L;
+
+  /**
+   * Check whether a terminated application has exceeded the resource retain 
duration at the
+   * provided instant
+   *
+   * @param lastObservedState last observed state of the application
+   * @return true if the app has terminated and resource retain duration is 
configured to a positive
+   *     value and the app is not within retain duration; false otherwise.
+   */
+  public boolean exceedRetainDurationAtInstant(
+      ApplicationState lastObservedState, Instant instant) {
+    return lastObservedState != null
+        && lastObservedState.getCurrentStateSummary().isTerminated()
+        && resourceRetainDurationMillis > 0L
+        && Instant.parse(lastObservedState.getLastTransitionTime())
+            .plusMillis(resourceRetainDurationMillis)
+            .isBefore(instant);
+  }
+
+  /**
+   * Indicates whether the reconciler need to perform retain duration check
+   *
+   * @return true `resourceRetainDurationMillis` is set to non-negative value
+   */
+  public boolean isRetainDurationEnabled() {
+    return resourceRetainDurationMillis >= 0L;
+  }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
index f7b61bf..c62a506 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java
@@ -30,7 +30,6 @@ import java.util.function.Supplier;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -47,16 +46,22 @@ import 
org.apache.spark.k8s.operator.status.ApplicationStateSummary;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
 import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
 import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
 
 /**
  * Cleanup all secondary resources when application is deleted, or at the end 
of each attempt.
  * Update Application status to indicate whether another attempt would be made.
  */
-@AllArgsConstructor
 @NoArgsConstructor
 @Slf4j
 public class AppCleanUpStep extends AppReconcileStep {
   private Supplier<ApplicationState> onDemandCleanUpReason;
+  private String stateUpdateMessage;
+
+  public AppCleanUpStep(Supplier<ApplicationState> onDemandCleanUpReason) {
+    super();
+    this.onDemandCleanUpReason = onDemandCleanUpReason;
+  }
 
   /**
    * Cleanup secondary resources for an application if needed and updates 
application status
@@ -88,42 +93,37 @@ public class AppCleanUpStep extends AppReconcileStep {
     ApplicationStatus currentStatus = application.getStatus();
     ApplicationState currentState = currentStatus.getCurrentState();
     ApplicationTolerations tolerations = 
application.getSpec().getApplicationTolerations();
-    if 
(ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary()))
 {
-      statusRecorder.removeCachedStatus(application);
-      return ReconcileProgress.completeAndNoRequeue();
-    }
-    String stateMessage = null;
-    if (isOnDemandCleanup()) {
-      log.info("Cleaning up application resources on demand");
-    } else {
-      if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
-          currentState.getCurrentStateSummary())) {
-        statusRecorder.removeCachedStatus(application);
-        return ReconcileProgress.completeAndNoRequeue();
-      } else if (currentState.getCurrentStateSummary().isStopping()) {
-        if (retainReleaseResourceForPolicyAndState(
-            tolerations.getResourceRetainPolicy(), currentState)) {
-          if (tolerations.getRestartConfig() != null
-              && 
!RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) 
{
-            stateMessage =
-                "Application is configured to restart, resources created in 
current "
-                    + "attempt would be force released.";
-            log.warn(stateMessage);
-          } else {
-            ApplicationState terminationState =
-                new ApplicationState(
-                    ApplicationStateSummary.TerminatedWithoutReleaseResources,
-                    "Application is terminated without releasing resources as 
configured.");
-            long requeueAfterMillis =
-                
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
-            return appendStateAndRequeueAfter(
-                context, statusRecorder, terminationState, 
Duration.ofMillis(requeueAfterMillis));
-          }
+    if (currentState.getCurrentStateSummary().isTerminated()) {
+      Optional<ReconcileProgress> terminatedAppProgress =
+          checkEarlyExitForTerminatedApp(application, statusRecorder);
+      if (terminatedAppProgress.isPresent()) {
+        return terminatedAppProgress.get();
+      }
+    } else if (isOnDemandCleanup()) {
+      log.info("Releasing secondary resources for application on demand.");
+    } else if (currentState.getCurrentStateSummary().isStopping()) {
+      if (retainReleaseResourceForPolicyAndState(
+          tolerations.getResourceRetainPolicy(), currentState)) {
+        if (tolerations.getRestartConfig() != null
+            && 
!RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) 
{
+          stateUpdateMessage =
+              "Application is configured to restart, resources created in 
current "
+                  + "attempt would be force released.";
+          log.warn(stateUpdateMessage);
+        } else {
+          ApplicationState terminationState =
+              new ApplicationState(
+                  ApplicationStateSummary.TerminatedWithoutReleaseResources,
+                  "Application is terminated without releasing resources as 
configured.");
+          long requeueAfterMillis =
+              
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
+          return appendStateAndRequeueAfter(
+              context, statusRecorder, terminationState, 
Duration.ofMillis(requeueAfterMillis));
         }
-      } else {
-        log.debug("Clean up is not expected for app, proceeding to next 
step.");
-        return ReconcileProgress.proceed();
       }
+    } else {
+      log.debug("Clean up is not expected for app, proceeding to next step.");
+      return ReconcileProgress.proceed();
     }
 
     List<HasMetadata> resourcesToRemove = new ArrayList<>();
@@ -159,8 +159,8 @@ public class AppCleanUpStep extends AppReconcileStep {
     ApplicationStatus updatedStatus;
     if (onDemandCleanUpReason != null) {
       ApplicationState state = onDemandCleanUpReason.get();
-      if (StringUtils.isNotEmpty(stateMessage)) {
-        state.setMessage(stateMessage);
+      if (StringUtils.isNotEmpty(stateUpdateMessage)) {
+        state.setMessage(stateUpdateMessage);
       }
       long requeueAfterMillis =
           
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
@@ -171,7 +171,7 @@ public class AppCleanUpStep extends AppReconcileStep {
           currentStatus.terminateOrRestart(
               tolerations.getRestartConfig(),
               tolerations.getResourceRetainPolicy(),
-              stateMessage,
+              stateUpdateMessage,
               
SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue());
       long requeueAfterMillis =
           
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
@@ -184,6 +184,41 @@ public class AppCleanUpStep extends AppReconcileStep {
     }
   }
 
+  protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
+      final SparkApplication application, final SparkAppStatusRecorder 
statusRecorder) {
+    ApplicationStatus currentStatus = application.getStatus();
+    ApplicationState currentState = currentStatus.getCurrentState();
+    ApplicationTolerations tolerations = 
application.getSpec().getApplicationTolerations();
+    if 
(ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary()))
 {
+      statusRecorder.removeCachedStatus(application);
+      return Optional.of(ReconcileProgress.completeAndNoRequeue());
+    }
+    if (isOnDemandCleanup()) {
+      return Optional.empty();
+    }
+    if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
+        currentState.getCurrentStateSummary())) {
+      if (tolerations.isRetainDurationEnabled()) {
+        Instant now = Instant.now();
+        if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
+          onDemandCleanUpReason = 
SparkAppStatusUtils::appExceededRetainDuration;
+          return Optional.empty();
+        } else {
+          Duration nextCheckDuration =
+              Duration.between(
+                  Instant.now(),
+                  Instant.parse(currentState.getLastTransitionTime())
+                      
.plusMillis(tolerations.getResourceRetainDurationMillis()));
+          return 
Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
+        }
+      } else {
+        statusRecorder.removeCachedStatus(application);
+        return Optional.of(ReconcileProgress.completeAndNoRequeue());
+      }
+    }
+    return Optional.empty();
+  }
+
   protected boolean isOnDemandCleanup() {
     return onDemandCleanUpReason != null;
   }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
index ecbe3e2..8b763bf 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java
@@ -61,6 +61,11 @@ public final class SparkAppStatusUtils {
         ApplicationStateSummary.ResourceReleased, 
Constants.APP_CANCELLED_MESSAGE);
   }
 
+  public static ApplicationState appExceededRetainDuration() {
+    return new ApplicationState(
+        ApplicationStateSummary.ResourceReleased, 
Constants.APP_EXCEEDED_RETAIN_DURATION_MESSAGE);
+  }
+
   public static boolean hasReachedState(
       SparkApplication application, ApplicationState stateToCheck) {
     return isValidApplicationStatus(application)
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
index d589765..ff7f371 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java
@@ -116,4 +116,61 @@ class SparkAppReconcilerTest {
       assertTrue(deleteControl.isRemoveFinalizer());
     }
   }
+
+  @SuppressWarnings("PMD.UnusedLocalVariable")
+  @Test
+  void testCleanupAppTerminatedWithoutReleaseResources() {
+    try (MockedConstruction<SparkAppContext> mockAppContext =
+            mockConstruction(
+                SparkAppContext.class,
+                (mock, context) -> {
+                  when(mock.getResource()).thenReturn(app);
+                  when(mock.getClient()).thenReturn(mockClient);
+                  
when(mock.getDriverPod()).thenReturn(Optional.of(mockDriver));
+                  when(mock.getDriverPodSpec()).thenReturn(mockDriver);
+                  
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+                  
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+                });
+        MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
+      // delete app
+      app.setStatus(
+          app.getStatus()
+              .appendNewState(
+                  new ApplicationState(
+                      
ApplicationStateSummary.TerminatedWithoutReleaseResources, "")));
+      DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
+      assertFalse(deleteControl.isRemoveFinalizer());
+      utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, 
mockDriver, false));
+      assertEquals(
+          ApplicationStateSummary.ResourceReleased,
+          app.getStatus().getCurrentState().getCurrentStateSummary());
+
+      // proceed delete for terminated app
+      deleteControl = reconciler.cleanup(app, mockContext);
+      assertTrue(deleteControl.isRemoveFinalizer());
+    }
+  }
+
+  @SuppressWarnings("PMD.UnusedLocalVariable")
+  @Test
+  void testCleanupAppTerminatedResourceReleased() {
+    try (MockedConstruction<SparkAppContext> mockAppContext =
+            mockConstruction(
+                SparkAppContext.class,
+                (mock, context) -> {
+                  when(mock.getResource()).thenReturn(app);
+                  when(mock.getClient()).thenReturn(mockClient);
+                  
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+                  
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+                });
+        MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
+      // delete app
+      app.setStatus(
+          app.getStatus()
+              .appendNewState(new 
ApplicationState(ApplicationStateSummary.ResourceReleased, "")));
+      DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
+      assertTrue(deleteControl.isRemoveFinalizer());
+      utils.verifyNoInteractions();
+    }
+  }
 }
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 1c4637e..23ed54b 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -47,6 +48,8 @@ import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.context.SparkAppContext;
 import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.ApplicationTolerations;
+import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
 import org.apache.spark.k8s.operator.status.ApplicationState;
 import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
@@ -56,6 +59,40 @@ import 
org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
 
 @SuppressWarnings("PMD.NcssCount")
 class AppCleanUpStepTest {
+  private final ApplicationSpec alwaysRetain =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Always)
+                  .build())
+          .build();
+  private final ApplicationSpec neverRetain =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Never)
+                  .build())
+          .build();
+  private final ApplicationSpec exceedRetainDuration =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Always)
+                  .resourceRetainDurationMillis(1L)
+                  .build())
+          .build();
+  private final ApplicationSpec notExceedRetainDuration =
+      ApplicationSpec.builder()
+          .applicationTolerations(
+              ApplicationTolerations.builder()
+                  .resourceRetainPolicy(ResourceRetainPolicy.Always)
+                  .resourceRetainDurationMillis(24 * 60 * 60 * 1000L)
+                  .build())
+          .build();
+
+  private final List<ApplicationSpec> specs =
+      List.of(alwaysRetain, neverRetain, exceedRetainDuration, 
notExceedRetainDuration);
+
   @Test
   void enableForceDelete() {
     AppCleanUpStep appCleanUpStep = new AppCleanUpStep();
@@ -120,7 +157,7 @@ class AppCleanUpStepTest {
               
ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress);
         }
 
-        verify(mockAppContext).getResource();
+        verify(mockAppContext, times(1)).getResource();
         verify(mockApp, times(2)).getSpec();
         verify(mockApp, times(2)).getStatus();
         verify(mockAppContext).getClient();
@@ -151,9 +188,9 @@ class AppCleanUpStepTest {
         when(mockApp.getSpec()).thenReturn(spec);
         ReconcileProgress progress = routineCheck.reconcile(mockAppContext, 
mockRecorder);
         Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), 
progress);
-        verify(mockAppContext).getResource();
-        verify(mockApp).getSpec();
-        verify(mockApp).getStatus();
+        verify(mockAppContext, times(1)).getResource();
+        verify(mockApp, times(2)).getSpec();
+        verify(mockApp, times(2)).getStatus();
         verify(mockRecorder).removeCachedStatus(mockApp);
         verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
       }
@@ -163,7 +200,7 @@ class AppCleanUpStepTest {
   @Test
   void onDemandCleanupForTerminatedAppExpectNoAction() {
     SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class);
-    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
     ApplicationStatus status = 
prepareApplicationStatus(ApplicationStateSummary.ResourceReleased);
     SparkApplication mockApp = mock(SparkApplication.class);
     ApplicationSpec spec = ApplicationSpec.builder().build();
@@ -171,11 +208,11 @@ class AppCleanUpStepTest {
     SparkAppContext mockAppContext = mock(SparkAppContext.class);
     when(mockAppContext.getResource()).thenReturn(mockApp);
     when(mockApp.getSpec()).thenReturn(spec);
-    ReconcileProgress progress = routineCheck.reconcile(mockAppContext, 
mockRecorder);
+    ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, 
mockRecorder);
     Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), 
progress);
-    verify(mockAppContext).getResource();
-    verify(mockApp).getSpec();
-    verify(mockApp).getStatus();
+    verify(mockAppContext, times(1)).getResource();
+    verify(mockApp, times(2)).getSpec();
+    verify(mockApp, times(2)).getStatus();
     verify(mockRecorder).removeCachedStatus(mockApp);
     verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp);
   }
@@ -206,9 +243,9 @@ class AppCleanUpStepTest {
           ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), 
progress);
     }
 
-    verify(mockAppContext).getResource();
-    verify(mockApp, times(2)).getSpec();
-    verify(mockApp, times(2)).getStatus();
+    verify(mockAppContext, times(1)).getResource();
+    verify(mockApp, times(3)).getSpec();
+    verify(mockApp, times(3)).getStatus();
     verify(mockAppContext).getClient();
     verify(mockAppContext).getDriverPod();
     ArgumentCaptor<ApplicationState> captor = 
ArgumentCaptor.forClass(ApplicationState.class);
@@ -269,14 +306,14 @@ class AppCleanUpStepTest {
           ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), 
progress2);
     }
 
-    verify(mockAppContext1).getResource();
+    verify(mockAppContext1, times(1)).getResource();
     verify(mockApp1, times(2)).getSpec();
     verify(mockApp1, times(2)).getStatus();
     verify(mockAppContext1, times(3)).getClient();
     verify(mockAppContext1).getDriverPreResourcesSpec();
     verify(mockAppContext1).getDriverPodSpec();
     verify(mockAppContext1).getDriverResourcesSpec();
-    verify(mockAppContext2).getResource();
+    verify(mockAppContext2, times(1)).getResource();
     verify(mockApp2, times(2)).getSpec();
     verify(mockApp2, times(2)).getStatus();
     verify(mockAppContext2, times(3)).getClient();
@@ -299,9 +336,195 @@ class AppCleanUpStepTest {
         mockAppContext1, mockAppContext2, mockRecorder, mockApp1, mockApp2, 
mockClient, driverPod);
   }
 
+  @Test
+  void checkEarlyExitForResourceReleasedApp() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.ResourceReleased, 
ApplicationStateSummary.RunningHealthy);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+
+    for (ApplicationSpec appSpec : specs) {
+      for (ApplicationStatus appStatus : statusList) {
+        SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+        SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+        SparkApplication mockApp = mock(SparkApplication.class);
+        when(mockApp.getStatus()).thenReturn(appStatus);
+        when(mockApp.getSpec()).thenReturn(appSpec);
+
+        Optional<ReconcileProgress> routineCheckProgress =
+            routineCheck.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder1);
+        assertTrue(routineCheckProgress.isPresent());
+        Assertions.assertEquals(
+            ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
+        verify(mockRecorder1).removeCachedStatus(mockApp);
+
+        Optional<ReconcileProgress> onDemandProgress =
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+        assertTrue(onDemandProgress.isPresent());
+        Assertions.assertEquals(
+            ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
+        verify(mockRecorder2).removeCachedStatus(mockApp);
+      }
+    }
+  }
+
+  @Test
+  void checkEarlyExitForAppTerminatedWithoutReleaseResourcesInfiniteRetain() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.RunningHealthy);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+
+    for (ApplicationStatus appStatus : statusList) {
+      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+      SparkApplication mockApp = mock(SparkApplication.class);
+      when(mockApp.getStatus()).thenReturn(appStatus);
+      when(mockApp.getSpec()).thenReturn(alwaysRetain);
+
+      Optional<ReconcileProgress> routineCheckProgress =
+          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+      assertTrue(routineCheckProgress.isPresent());
+      Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), 
routineCheckProgress.get());
+      verify(mockRecorder1).removeCachedStatus(mockApp);
+
+      Optional<ReconcileProgress> onDemandProgress =
+          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+      assertFalse(onDemandProgress.isPresent());
+      verifyNoMoreInteractions(mockRecorder2);
+    }
+  }
+
+  @Test
+  void 
checkEarlyExitForAppTerminatedWithoutReleaseResourcesExceededRetainDuration() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.RunningHealthy);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+
+    for (ApplicationStatus appStatus : statusList) {
+      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+      SparkApplication mockApp = mock(SparkApplication.class);
+      when(mockApp.getStatus()).thenReturn(appStatus);
+      when(mockApp.getSpec()).thenReturn(exceedRetainDuration);
+
+      Optional<ReconcileProgress> routineCheckProgress =
+          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+      assertFalse(routineCheckProgress.isPresent());
+      verifyNoMoreInteractions(mockRecorder1);
+
+      Optional<ReconcileProgress> onDemandProgress =
+          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+      assertFalse(onDemandProgress.isPresent());
+      verifyNoMoreInteractions(mockRecorder2);
+    }
+  }
+
+  @Test
+  void 
checkEarlyExitForAppTerminatedWithoutReleaseResourcesWithinRetainDuration() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    ApplicationStatus succeeded =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.Succeeded);
+    ApplicationStatus failed =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.SchedulingFailure);
+    ApplicationStatus cancelled =
+        prepareApplicationStatus(
+            ApplicationStateSummary.TerminatedWithoutReleaseResources,
+            ApplicationStateSummary.RunningHealthy);
+    List<ApplicationStatus> statusList = List.of(succeeded, failed, cancelled);
+
+    for (ApplicationStatus appStatus : statusList) {
+      SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+      SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+      SparkApplication mockApp = mock(SparkApplication.class);
+      when(mockApp.getStatus()).thenReturn(appStatus);
+      when(mockApp.getSpec()).thenReturn(notExceedRetainDuration);
+
+      Optional<ReconcileProgress> routineCheckProgress =
+          routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1);
+      assertTrue(routineCheckProgress.isPresent());
+      ReconcileProgress reconcileProgress = routineCheckProgress.get();
+      assertTrue(reconcileProgress.isCompleted());
+      assertTrue(reconcileProgress.isRequeue());
+      verifyNoMoreInteractions(mockRecorder2);
+
+      Optional<ReconcileProgress> onDemandProgress =
+          cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+      assertFalse(onDemandProgress.isPresent());
+      verifyNoMoreInteractions(mockRecorder2);
+    }
+  }
+
+  @Test
+  void checkEarlyExitForNotTerminatedApp() {
+    AppCleanUpStep routineCheck = new AppCleanUpStep();
+    AppCleanUpStep cleanUpWithReason = new 
AppCleanUpStep(SparkAppStatusUtils::appCancelled);
+    for (ApplicationStateSummary stateSummary : 
ApplicationStateSummary.values()) {
+      if (stateSummary.isTerminated()) {
+        continue;
+      }
+      ApplicationStatus status = prepareApplicationStatus(stateSummary);
+      for (ApplicationSpec appSpec : specs) {
+        SparkAppStatusRecorder mockRecorder1 = 
mock(SparkAppStatusRecorder.class);
+        SparkAppStatusRecorder mockRecorder2 = 
mock(SparkAppStatusRecorder.class);
+        SparkApplication mockApp = mock(SparkApplication.class);
+        when(mockApp.getStatus()).thenReturn(status);
+        when(mockApp.getSpec()).thenReturn(appSpec);
+
+        Optional<ReconcileProgress> routineCheckProgress =
+            routineCheck.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder1);
+        assertTrue(routineCheckProgress.isEmpty());
+        verifyNoMoreInteractions(mockRecorder1);
+
+        Optional<ReconcileProgress> onDemandProgress =
+            cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, 
mockRecorder2);
+        assertTrue(onDemandProgress.isEmpty());
+        verifyNoMoreInteractions(mockRecorder2);
+      }
+    }
+  }
+
   private ApplicationStatus prepareApplicationStatus(ApplicationStateSummary 
currentStateSummary) {
     ApplicationStatus status = new ApplicationStatus();
     ApplicationState state = new ApplicationState(currentStateSummary, "foo");
+    // to make sure the state exceeds threshold
+    state.setLastTransitionTime(Instant.now().minusSeconds(10).toString());
     return status.appendNewState(state);
   }
 
@@ -309,6 +532,7 @@ class AppCleanUpStepTest {
       ApplicationStateSummary currentStateSummary, ApplicationStateSummary 
previousStateSummary) {
     ApplicationStatus status = prepareApplicationStatus(previousStateSummary);
     ApplicationState state = new ApplicationState(currentStateSummary, "foo");
+    state.setLastTransitionTime(Instant.now().minusSeconds(5).toString());
     return status.appendNewState(state);
   }
 
diff --git 
a/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml
 
b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml
new file mode 100644
index 0000000..4460291
--- /dev/null
+++ 
b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: spark.apache.org/v1
+kind: SparkApplication
+metadata:
+  name: spark-example-retain-duration
+  namespace: ($SPARK_APP_NAMESPACE)
+status:
+  stateTransitionHistory:
+    (*.currentStateSummary):
+      - "Submitted"
+      - "DriverRequested"
+      - "DriverStarted"
+      - "DriverReady"
+      - "RunningHealthy"
+      - "Succeeded"
+      - "TerminatedWithoutReleaseResources"
+      - "ResourceReleased"
diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml 
b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
new file mode 100644
index 0000000..2c8b558
--- /dev/null
+++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: chainsaw.kyverno.io/v1alpha1
+kind: Test
+metadata:
+  name: garbage-collect-with-retain-duration-test
+spec:
+  scenarios:
+  - bindings:
+      - name: TEST_NAME
+        value: succeeded
+      - name: APPLICATION_FILE_NAME
+        value: spark-example-retain-duration.yaml
+      - name: SPARK_APPLICATION_NAME
+        value: spark-example-retain-duration
+  steps:
+  - try:
+    - script:
+        env:
+          - name: FILE_NAME
+            value: ($APPLICATION_FILE_NAME)
+        content: kubectl apply -f $FILE_NAME
+    - assert:
+        bindings:
+          - name: SPARK_APP_NAMESPACE
+            value: default
+        timeout: 120s
+        file: 
"../assertions/spark-application/spark-state-transition-with-retain-check.yaml"
+    catch:
+    - describe:
+        apiVersion: spark.apache.org/v1
+        kind: SparkApplication
+        namespace: default
+    finally:
+    - script:
+        env:
+          - name: SPARK_APPLICATION_NAME
+            value: ($SPARK_APPLICATION_NAME)
+        timeout: 120s
+        content: |
+          kubectl delete sparkapplication $SPARK_APPLICATION_NAME
diff --git 
a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml 
b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
new file mode 100644
index 0000000..022fdd4
--- /dev/null
+++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: spark.apache.org/v1
+kind: SparkApplication
+metadata:
+  name: spark-example-retain-duration
+  namespace: default
+spec:
+  mainClass: "org.apache.spark.examples.SparkPi"
+  jars: "local:///opt/spark/examples/jars/spark-examples.jar"
+  applicationTolerations:
+    resourceRetainPolicy: Always
+    resourceRetainDurationMillis: 10000
+  sparkConf:
+    spark.executor.instances: "1"
+    spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala"
+    spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
+  runtimeVersions:
+    sparkVersion: 4.0.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to