This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 43be18c  [SPARK-54010] Support 
`applicationTolerations.restartConfig.restartCounterResetMillis`
43be18c is described below

commit 43be18ccddfe064cdc97e5c91983f09a951c32e3
Author: Zhou JIANG <[email protected]>
AuthorDate: Mon Nov 3 09:28:56 2025 +0100

    [SPARK-54010] Support 
`applicationTolerations.restartConfig.restartCounterResetMillis`
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for automatic restart counter reset based on 
application attempt duration. The feature introduces a new 
`restartCounterResetMillis` field in RestartConfig that allows the restart 
counter to be reset if an application runs successfully for a specified 
duration before terminating.
    
    Also added unit test.
    
    ### Why are the changes needed?
    
    With this feature, users can distinguish between persistent failures (quick 
consecutive crashes) and applications that run for long periods between 
failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    A new optional configuration field restartCounterResetMillis added to the 
RestartConfig spec.
    
    ### How was this patch tested?
    
    Added unit test that validates restart counter works as expected.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #405 from jiangzho/restart_counter.
    
    Authored-by: Zhou JIANG <[email protected]>
    Signed-off-by: Peter Toth <[email protected]>
---
 .../sparkapplications.spark.apache.org-v1.yaml     |   7 +
 .../crds/sparkclusters.spark.apache.org-v1.yaml    |   4 +
 docs/spark_custom_resources.md                     |  25 +++
 .../spark/k8s/operator/spec/RestartConfig.java     |   5 +
 .../k8s/operator/status/ApplicationStatus.java     |  59 +++++-
 .../spark/k8s/operator/status/AttemptInfo.java     |   6 +-
 .../k8s/operator/status/ApplicationStatusTest.java | 223 +++++++++++++++++++++
 7 files changed, 325 insertions(+), 4 deletions(-)

diff --git 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 6b2ee7b..54a55af 100644
--- 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++ 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -17426,6 +17426,9 @@ spec:
                           type: integer
                         restartBackoffMillis:
                           type: integer
+                        restartCounterResetMillis:
+                          default: -1
+                          type: integer
                         restartPolicy:
                           enum:
                             - Always
@@ -23735,6 +23738,8 @@ spec:
                       properties:
                         id:
                           type: integer
+                        restartCounter:
+                          type: integer
                       type: object
                     stateTransitionHistory:
                       additionalProperties:
@@ -24895,6 +24900,8 @@ spec:
                       properties:
                         id:
                           type: integer
+                        restartCounter:
+                          type: integer
                       type: object
                     stateTransitionHistory:
                       additionalProperties:
diff --git 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkclusters.spark.apache.org-v1.yaml
 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkclusters.spark.apache.org-v1.yaml
index 7a8993c..60fdaa6 100644
--- 
a/build-tools/helm/spark-kubernetes-operator/crds/sparkclusters.spark.apache.org-v1.yaml
+++ 
b/build-tools/helm/spark-kubernetes-operator/crds/sparkclusters.spark.apache.org-v1.yaml
@@ -21802,6 +21802,8 @@ spec:
                       properties:
                         id:
                           type: integer
+                        restartCounter:
+                          type: integer
                       type: object
                     stateTransitionHistory:
                       additionalProperties:
@@ -21842,6 +21844,8 @@ spec:
                       properties:
                         id:
                           type: integer
+                        restartCounter:
+                          type: integer
                       type: object
                     stateTransitionHistory:
                       additionalProperties:
diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md
index 569503c..dbf5c0e 100644
--- a/docs/spark_custom_resources.md
+++ b/docs/spark_custom_resources.md
@@ -234,6 +234,31 @@ restartConfig:
   restartBackoffMillis: 30000
 ```
 
+### Restart Counter reset
+
+The restartCounterResetMillis field controls automatic restart counter resets 
for long-running
+application attempts. When set to a non-negative value (in milliseconds), the 
operator will reset
+the restart counter if an application attempt runs successfully for at least 
the specified duration
+before failing. This feature enables user to allow maximal x attempts if an 
app fails really
+fast (which could indicate some underlying issue other than the app itself) 
while allowing
+indefinite restarts when the app can survive given threshold.
+
+For example, setting
+
+```yaml
+
+restartConfig:
+  ## 1hr
+  restartCounterResetMillis: 3600000
+  maxRestartAttempts: 3
+
+```
+
+means the application can fail and restart up to 3 times, but if any attempt 
runs for more than
+1 hour, the counter resets to zero, allowing another 3 restart attempts.
+
+The default value is -1, which disables automatic counter resets.
+
 ### Timeouts
 
 It's possible to configure applications to be proactively terminated and 
resubmitted in particular
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
index f724b43..64b9837 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java
@@ -21,6 +21,7 @@ package org.apache.spark.k8s.operator.spec;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
+import io.fabric8.generator.annotation.Default;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -37,4 +38,8 @@ public class RestartConfig {
   @Builder.Default protected RestartPolicy restartPolicy = RestartPolicy.Never;
   @Builder.Default protected Long maxRestartAttempts = 3L;
   @Builder.Default protected Long restartBackoffMillis = 30000L;
+
+  @Default("-1")
+  @Builder.Default
+  protected Long restartCounterResetMillis = -1L;
 }
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
index 3d2d713..52a5e8c 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java
@@ -21,6 +21,10 @@ package org.apache.spark.k8s.operator.status;
 
 import static 
org.apache.spark.k8s.operator.Constants.EXCEED_MAX_RETRY_ATTEMPT_MESSAGE;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -115,7 +119,18 @@ public class ApplicationStatus
           currentAttemptSummary);
     }
 
-    if (currentAttemptSummary.getAttemptInfo().getId() >= 
restartConfig.getMaxRestartAttempts()) {
+    boolean resetRestartCounter = false;
+    if (restartConfig.getRestartCounterResetMillis() >= 0L) {
+      resetRestartCounter =
+          calculateCurrentAttemptDuration()
+                  
.compareTo(Duration.ofMillis(restartConfig.getRestartCounterResetMillis()))
+              >= 0;
+    }
+
+    long effectiveAttemptId =
+        resetRestartCounter ? 0L : 
currentAttemptSummary.getAttemptInfo().getRestartCounter();
+
+    if (effectiveAttemptId >= restartConfig.getMaxRestartAttempts()) {
       String stateMessage =
           String.format(EXCEED_MAX_RETRY_ATTEMPT_MESSAGE, 
restartConfig.getMaxRestartAttempts());
       if (stateMessageOverride != null && !stateMessageOverride.isEmpty()) {
@@ -138,7 +153,9 @@ public class ApplicationStatus
           currentAttemptSummary);
     }
 
-    AttemptInfo nextAttemptInfo = 
currentAttemptSummary.getAttemptInfo().createNextAttemptInfo();
+    AttemptInfo nextAttemptInfo =
+        
currentAttemptSummary.getAttemptInfo().createNextAttemptInfo(resetRestartCounter);
+
     ApplicationAttemptSummary nextAttemptSummary = new 
ApplicationAttemptSummary(nextAttemptInfo);
     ApplicationState state =
         new ApplicationState(ApplicationStateSummary.ScheduledToRestart, 
stateMessageOverride);
@@ -163,6 +180,44 @@ public class ApplicationStatus
     }
   }
 
+  /**
+   * Finds the first state of the current application attempt.
+   *
+   * <p>This method traverses the state transition history in reverse order to 
find the most recent
+   * initializing state (e.g., Submitted or ScheduledToRestart), which marks 
the beginning of the
+   * current attempt. If no initializing state is found, it returns the first 
entry in the history.
+   *
+   * @return The ApplicationState representing the start of the current 
attempt.
+   */
+  protected ApplicationState findFirstStateOfCurrentAttempt() {
+    List<Map.Entry<Long, ApplicationState>> entries =
+        new ArrayList<>(stateTransitionHistory.entrySet());
+    for (int k = entries.size() - 1; k >= 0; k--) {
+      Map.Entry<Long, ApplicationState> entry = entries.get(k);
+      if (entry.getValue().getCurrentStateSummary().isInitializing()) {
+        return entry.getValue();
+      }
+    }
+    return entries.get(0).getValue();
+  }
+
+  /**
+   * Calculates the duration of the current application attempt.
+   *
+   * <p>The duration is calculated as the time between the first state of the 
current attempt (as
+   * determined by {@link #findFirstStateOfCurrentAttempt()}) and the current 
state's last
+   * transition time. This is particularly useful for determining whether the 
restart counter should
+   * be reset based on the configured {@code restartCounterResetMillis}.
+   *
+   * @return A Duration representing the time elapsed since the start of the 
current attempt.
+   */
+  protected Duration calculateCurrentAttemptDuration() {
+    ApplicationState firstStateOfCurrentAttempt = 
findFirstStateOfCurrentAttempt();
+    return Duration.between(
+        Instant.parse(firstStateOfCurrentAttempt.getLastTransitionTime()),
+        Instant.parse(currentState.getLastTransitionTime()));
+  }
+
   /**
    * Creates an ApplicationState indicating that the application is terminated 
without releasing
    * resources.
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
index 8f8e99a..7c28ed9 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java
@@ -26,6 +26,7 @@ import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.ToString;
 
 /** Information about an attempt. */
@@ -38,13 +39,14 @@ import lombok.ToString;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AttemptInfo {
   @Getter @Builder.Default protected final long id = 0L;
+  @Getter @Setter protected long restartCounter;
 
   /**
    * Creates a new AttemptInfo object representing the next attempt.
    *
    * @return A new AttemptInfo with an incremented ID.
    */
-  public AttemptInfo createNextAttemptInfo() {
-    return new AttemptInfo(id + 1L);
+  public AttemptInfo createNextAttemptInfo(boolean resetRestartCounter) {
+    return new AttemptInfo(id + 1L, resetRestartCounter ? 1L : restartCounter 
+ 1);
   }
 }
diff --git 
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
index bcc0011..ef5cc8a 100644
--- 
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
+++ 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStatusTest.java
@@ -25,6 +25,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.junit.jupiter.api.Test;
 
 import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy;
@@ -170,4 +175,222 @@ class ApplicationStatusTest {
     assertEquals(
         1L, 
maxRestartExceededRetainResource.getCurrentAttemptSummary().getAttemptInfo().getId());
   }
+
+  @Test
+  void testTerminateOrRestartWithRestartCounterReset() throws Exception {
+    RestartConfig restartConfigWithCounter = new RestartConfig();
+    restartConfigWithCounter.setRestartPolicy(RestartPolicy.Always);
+    restartConfigWithCounter.setMaxRestartAttempts(1L);
+    restartConfigWithCounter.setRestartCounterResetMillis(300000L); // 5 
minutes
+    String messageOverride = "restart counter test";
+
+    // Create a status with states spanning more than 5 minutes
+    Instant now = Instant.now();
+    Instant tenMinutesAgo = now.minus(Duration.ofMinutes(10));
+
+    ApplicationStatus status = 
createInitialStatusWithSubmittedTime(tenMinutesAgo);
+
+    status =
+        status
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverReady, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+            .appendNewState(new ApplicationState(Succeeded, ""));
+
+    // Test restart with counter reset (duration > restartCounterResetMillis)
+    ApplicationStatus restartWithReset =
+        status.terminateOrRestart(
+            restartConfigWithCounter, ResourceRetainPolicy.Never, 
messageOverride, false);
+
+    assertEquals(
+        ApplicationStateSummary.ScheduledToRestart,
+        restartWithReset.getCurrentState().getCurrentStateSummary());
+    
assertTrue(restartWithReset.getCurrentState().getMessage().contains(messageOverride));
+    // Counter should be reset in current attempt therefore it's 1 in the new 
attempt, next attempt
+    // ID is also 1
+    assertEquals(1L, 
restartWithReset.getCurrentAttemptSummary().getAttemptInfo().getId());
+    assertEquals(
+        1L, 
restartWithReset.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+
+    // Test reset restart counter with previous attempt in history
+    Instant tenMinutesLater = now.plus(Duration.ofMinutes(10));
+    ApplicationState secondAttemptStoppingState = new 
ApplicationState(Succeeded, "");
+    
secondAttemptStoppingState.setLastTransitionTime(tenMinutesLater.toString());
+    status =
+        restartWithReset
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverReady, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+            .appendNewState(secondAttemptStoppingState);
+
+    ApplicationStatus secondAttemptRestart =
+        status.terminateOrRestart(
+            restartConfigWithCounter, ResourceRetainPolicy.Never, 
messageOverride, false);
+    assertEquals(
+        ApplicationStateSummary.ScheduledToRestart,
+        secondAttemptRestart.getCurrentState().getCurrentStateSummary());
+    // Counter should be reset in current attempt therefore it's again 1 in 
the new attempt, next
+    // attempt ID is incremented to 2
+    assertEquals(2L, 
secondAttemptRestart.getCurrentAttemptSummary().getAttemptInfo().getId());
+    assertEquals(
+        1L, 
secondAttemptRestart.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+
+    // validate status with history trimmed
+    ApplicationStatus secondAttemptRestartTrimmed =
+        status.terminateOrRestart(
+            restartConfigWithCounter, ResourceRetainPolicy.Never, 
messageOverride, true);
+    assertEquals(
+        ApplicationStateSummary.ScheduledToRestart,
+        
secondAttemptRestartTrimmed.getCurrentState().getCurrentStateSummary());
+    assertTrue(
+        
secondAttemptRestartTrimmed.getCurrentState().getMessage().contains(messageOverride));
+    assertEquals(
+        2L, 
secondAttemptRestartTrimmed.getCurrentAttemptSummary().getAttemptInfo().getId());
+    assertEquals(
+        1L,
+        secondAttemptRestartTrimmed
+            .getCurrentAttemptSummary()
+            .getAttemptInfo()
+            .getRestartCounter());
+
+    // Test restart without counter reset (duration < 
restartCounterResetMillis)
+    Instant twoMinutesLater = now.plus(Duration.ofMinutes(2));
+    ApplicationState thirdAttemptEnd = new ApplicationState(Succeeded, 
"recent");
+    thirdAttemptEnd.setLastTransitionTime(twoMinutesLater.toString());
+
+    status =
+        secondAttemptRestart
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(thirdAttemptEnd);
+    ApplicationStatus thirdAttemptTerminate =
+        status.terminateOrRestart(
+            restartConfigWithCounter, ResourceRetainPolicy.Never, 
messageOverride, false);
+    assertEquals(
+        ApplicationStateSummary.ResourceReleased,
+        thirdAttemptTerminate.getCurrentState().getCurrentStateSummary());
+    // Counter should not be reset in current attempt
+    assertEquals(2L, 
thirdAttemptTerminate.getCurrentAttemptSummary().getAttemptInfo().getId());
+    assertEquals(
+        1L, 
thirdAttemptTerminate.getCurrentAttemptSummary().getAttemptInfo().getRestartCounter());
+
+    // Test restart without counter reset in a trimmed status
+    status =
+        secondAttemptRestartTrimmed
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(thirdAttemptEnd);
+    ApplicationStatus thirdAttemptTerminateTrimmed =
+        status.terminateOrRestart(
+            restartConfigWithCounter, ResourceRetainPolicy.Never, 
messageOverride, true);
+    assertEquals(
+        ApplicationStateSummary.ResourceReleased,
+        
thirdAttemptTerminateTrimmed.getCurrentState().getCurrentStateSummary());
+    
assertTrue(restartWithReset.getCurrentState().getMessage().contains(messageOverride));
+    // Counter should not be reset in current attempt
+    assertEquals(
+        2L, 
thirdAttemptTerminateTrimmed.getCurrentAttemptSummary().getAttemptInfo().getId());
+    assertEquals(
+        1L,
+        thirdAttemptTerminateTrimmed
+            .getCurrentAttemptSummary()
+            .getAttemptInfo()
+            .getRestartCounter());
+  }
+
+  @Test
+  void testFindFirstStateOfCurrentAttempt() throws Exception {
+    // Test with single state (Submitted)
+    ApplicationStatus status = new ApplicationStatus();
+    ApplicationState firstState = status.findFirstStateOfCurrentAttempt();
+    assertEquals(Submitted, firstState.getCurrentStateSummary());
+
+    // Test with multiple states including initializing state
+    ApplicationStatus statusWithRestart =
+        new ApplicationStatus()
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.Failed, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.ScheduledToRestart, ""));
+
+    ApplicationState firstStateAfterRestart = 
statusWithRestart.findFirstStateOfCurrentAttempt();
+    assertEquals(
+        ApplicationStateSummary.ScheduledToRestart,
+        firstStateAfterRestart.getCurrentStateSummary());
+
+    // Test with states but no initializing state (should return first entry)
+    Map<Long, ApplicationState> history = new TreeMap<>();
+    history.put(0L, new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""));
+    history.put(1L, new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""));
+    history.put(2L, new ApplicationState(Succeeded, ""));
+    ApplicationStatus statusNoInitializing =
+        new ApplicationStatus(
+            new ApplicationState(Succeeded, ""),
+            history,
+            new ApplicationAttemptSummary(),
+            new ApplicationAttemptSummary());
+
+    ApplicationState firstStateNoInit = 
statusNoInitializing.findFirstStateOfCurrentAttempt();
+    assertEquals(
+        ApplicationStateSummary.DriverRequested, 
firstStateNoInit.getCurrentStateSummary());
+  }
+
+  @Test
+  void testCalculateCurrentAttemptDuration() throws Exception {
+    // Test with barely empty status
+    ApplicationStatus status = new ApplicationStatus();
+    Duration duration = status.calculateCurrentAttemptDuration();
+    assertNotNull(duration);
+    assertTrue(duration.toMillis() >= 0);
+
+    // Test with multiple states
+    ApplicationStatus statusWithStates =
+        new ApplicationStatus()
+            .appendNewState(new ApplicationState(Submitted, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""));
+
+    Duration durationMultipleStates = 
statusWithStates.calculateCurrentAttemptDuration();
+    assertNotNull(durationMultipleStates);
+    assertTrue(durationMultipleStates.toMillis() >= 0);
+
+    // Test with restart scenario - duration should be calculated from 
ScheduledToRestart state
+    // Create states with explicit timestamps
+    Instant now = Instant.now();
+    Instant oneHourAgo = now.minus(Duration.ofHours(1));
+    Instant tenMinutesAgo = now.minus(Duration.ofMinutes(10));
+
+    ApplicationState expectedSecondAttemptStart =
+        new ApplicationState(ApplicationStateSummary.ScheduledToRestart, "");
+    expectedSecondAttemptStart.setLastTransitionTime(tenMinutesAgo.toString());
+    ApplicationState expectedSecondAttemptEnd =
+        new ApplicationState(ApplicationStateSummary.Failed, "");
+
+    ApplicationStatus statusWithRestarts =
+        createInitialStatusWithSubmittedTime(oneHourAgo)
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.RunningHealthy, ""))
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.Failed, ""))
+            .appendNewState(expectedSecondAttemptStart)
+            .appendNewState(new 
ApplicationState(ApplicationStateSummary.DriverRequested, ""))
+            .appendNewState(expectedSecondAttemptEnd);
+
+    // Verify it finds ScheduledToRestart as the first state of current attempt
+    ApplicationState firstState = 
statusWithRestarts.findFirstStateOfCurrentAttempt();
+    assertEquals(expectedSecondAttemptStart, firstState);
+
+    // Verify duration is calculated from ScheduledToRestart state (10 minutes 
ago)
+    Duration durationAfterRestart = 
statusWithRestarts.calculateCurrentAttemptDuration();
+    assertNotNull(durationAfterRestart);
+
+    Duration expectedDuration =
+        Duration.between(
+            tenMinutesAgo, 
Instant.parse(expectedSecondAttemptEnd.getLastTransitionTime()));
+    assertEquals(expectedDuration, durationAfterRestart);
+  }
+
+  private ApplicationStatus createInitialStatusWithSubmittedTime(Instant 
submittedTime) {
+    ApplicationStatus status = new ApplicationStatus();
+    ApplicationState submittedState = 
status.getStateTransitionHistory().get(0L);
+    submittedState.setLastTransitionTime(submittedTime.toString());
+    return status;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to