This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e80dead  [SPARK-53706] App reconcile steps should properly handle 
exceptions in status update
e80dead is described below

commit e80deada70ba5357d7f5c55c5e8583c3941dc3a5
Author: Zhou JIANG <[email protected]>
AuthorDate: Thu Sep 25 14:47:15 2025 -0700

    [SPARK-53706] App reconcile steps should properly handle exceptions in 
status update
    
    ### What changes were proposed in this pull request?
    
    This PR adds exception handling for status recorder when persisting status. 
If encountered exceptions in updating app status, reconciler would finish 
current reconcile loop and requeue a new one instead of blindly throw the 
exception.
    
    ### Why are the changes needed?
    
    SparkAppReconciler is not handling exceptions when updating app status - 
these failed reconcile loops may end up with endless retry if the status update 
is caused by conflicts.
    
    For example, we observe exceptions like these when app is starting
    
    ```
    Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: PUT at: 
https://kind-control-plane.vsl:6443/apis/spark.apache.org/v1/namespaces/default/sparkapplications/spark-example-retain-duration/status.
 Message: Operation cannot be fulfilled on sparkapplications.spark.apache.org 
"spark-example-retain-duration": the object has been modified; please apply 
your changes to the latest version and try again. Received status: 
Status(apiVersion=v1, code=409, details [...]
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:642)
 ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:622)
 ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:582)
 ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:549)
 ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
    at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
    at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
~[?:?]
    at 
io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:141)
 ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?]
    ```
    
    Why is this happening ? Because reconcile can be triggered again by driver 
pod status update while another reconcile is in-progress. Without proper 
exception handling, this would keep recurring, taking unnecessary CPU time and 
confusing user in logs.
    
    Another corner case that would be fixed by this patch: current 
`AppInitStep` would mark app as `SchedulingFailure` even if the resources are 
requested as expected and only the status update fails. This patch handles 
exceptions for resource creation and status update separately.
    
    We'd better digest this better: if an exception is thrown while updating 
app status (which is typically at the last of each reconcile) - operator shall 
properly finish this reconcile loop and start a new one. App status is fetched 
from cache at the beginning of each reconcile - and our reconcile steps are 
ready designed to be idempotent.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Tested from two perspectives:
    
    1. CIs - also added new unit test to validate update idempotency in init 
step
    2. Check logs : with rthis patch, no more exceptions like above is thrown 
while running apps
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #341 from jiangzho/status_patch.
    
    Authored-by: Zhou JIANG <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../reconciler/reconcilesteps/AppInitStep.java     | 25 +++++------
 .../reconcilesteps/AppReconcileStep.java           | 51 ++++++++++++++++++----
 .../reconciler/reconcilesteps/AppRunningStep.java  |  9 ++--
 .../reconcilesteps/AppUnknownStateStep.java        |  7 ++-
 .../reconciler/reconcilesteps/AppValidateStep.java | 10 +++--
 .../k8s/operator/utils/SparkAppStatusRecorder.java | 10 +++--
 .../spark/k8s/operator/utils/StatusRecorder.java   | 13 ++++--
 .../reconcilesteps/AppCleanUpStepTest.java         |  7 +++
 .../reconciler/reconcilesteps/AppInitStepTest.java | 47 +++++++++++++++++++-
 9 files changed, 140 insertions(+), 39 deletions(-)

diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
index f8cbfce..cc5c36a 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java
@@ -103,30 +103,29 @@ public class AppInitStep extends AppReconcileStep {
           }
         }
       }
-      ApplicationStatus updatedStatus =
-          context
-              .getResource()
-              .getStatus()
-              .appendNewState(
-                  new ApplicationState(
-                      ApplicationStateSummary.DriverRequested, 
Constants.DRIVER_REQUESTED_MESSAGE));
-      statusRecorder.persistStatus(context, updatedStatus);
-      return completeAndDefaultRequeue();
     } catch (Exception e) {
       if (log.isErrorEnabled()) {
         log.error("Failed to request driver resource.", e);
       }
       String errorMessage =
           Constants.SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + 
buildGeneralErrorMessage(e);
-      statusRecorder.persistStatus(
-          context,
+      ApplicationStatus updatedStatus =
           context
               .getResource()
               .getStatus()
               .appendNewState(
-                  new 
ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage)));
-      return completeAndImmediateRequeue();
+                  new 
ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage));
+      return attemptStatusUpdate(
+          context, statusRecorder, updatedStatus, 
completeAndImmediateRequeue());
     }
+    ApplicationStatus updatedStatus =
+        context
+            .getResource()
+            .getStatus()
+            .appendNewState(
+                new ApplicationState(
+                    ApplicationStateSummary.DriverRequested, 
Constants.DRIVER_REQUESTED_MESSAGE));
+    return attemptStatusUpdate(context, statusRecorder, updatedStatus, 
completeAndDefaultRequeue());
   }
 
   /**
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
index 3502708..ba56176 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Optional;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import lombok.extern.log4j.Log4j2;
 
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.context.SparkAppContext;
@@ -38,6 +39,7 @@ import org.apache.spark.k8s.operator.status.ApplicationStatus;
 import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
 
 /** Basic reconcile step for application. */
+@Log4j2
 public abstract class AppReconcileStep {
   /**
    * Reconciles a specific step for a Spark application.
@@ -77,18 +79,45 @@ public abstract class AppReconcileStep {
         for (ApplicationState state : stateUpdates) {
           currentStatus = currentStatus.appendNewState(state);
         }
-        statusRecorder.persistStatus(context, currentStatus);
-        return completeAndImmediateRequeue();
+        return attemptStatusUpdate(
+            context, statusRecorder, currentStatus, 
completeAndImmediateRequeue());
       }
     } else {
       ApplicationStatus updatedStatus = 
currentStatus.appendNewState(driverUnexpectedRemoved());
-      statusRecorder.persistStatus(context, updatedStatus);
+      return attemptStatusUpdate(
+          context, statusRecorder, updatedStatus, 
completeAndImmediateRequeue());
+    }
+  }
+
+  /**
+   * Updates the application status - if the status is successfully persisted, 
proceed with the
+   * given progress. Otherwise, completes current reconcile loop immediately 
and requeue. Latest
+   * application status would be fetched from cache in next reconcile attempt.
+   *
+   * @param context The SparkAppContext for the application.
+   * @param statusRecorder The SparkAppStatusRecorder for recording status 
updates.
+   * @param updatedStatus The updated ApplicationStatus.
+   * @param progressUponSuccessStatusUpdate The ReconcileProgress if the 
status update has been
+   *     persisted successfully.
+   * @return The ReconcileProgress for next steps.
+   */
+  protected ReconcileProgress attemptStatusUpdate(
+      final SparkAppContext context,
+      final SparkAppStatusRecorder statusRecorder,
+      final ApplicationStatus updatedStatus,
+      final ReconcileProgress progressUponSuccessStatusUpdate) {
+
+    if (statusRecorder.persistStatus(context, updatedStatus)) {
+      return progressUponSuccessStatusUpdate;
+    } else {
+      log.warn("Failed to persist status, will retry status update in next 
reconcile attempt");
       return completeAndImmediateRequeue();
     }
   }
 
   /**
-   * Updates the application status and re-queues the reconciliation after a 
specified duration.
+   * Updates the application status and re-queues the reconciliation after a 
specified duration. If
+   * the status update fails, trigger an immediate requeue.
    *
    * @param context The SparkAppContext for the application.
    * @param statusRecorder The SparkAppStatusRecorder for recording status 
updates.
@@ -101,13 +130,16 @@ public abstract class AppReconcileStep {
       SparkAppStatusRecorder statusRecorder,
       ApplicationStatus updatedStatus,
       Duration requeueAfter) {
-    statusRecorder.persistStatus(context, updatedStatus);
-    return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
+    return attemptStatusUpdate(
+        context,
+        statusRecorder,
+        updatedStatus,
+        ReconcileProgress.completeAndRequeueAfter(requeueAfter));
   }
 
   /**
    * Appends a new state to the application status, persists it, and re-queues 
the reconciliation
-   * after a specified duration.
+   * after a specified duration. If the status update fails, trigger an 
immediate requeue.
    *
    * @param context The SparkAppContext for the application.
    * @param statusRecorder The SparkAppStatusRecorder for recording status 
updates.
@@ -120,7 +152,10 @@ public abstract class AppReconcileStep {
       SparkAppStatusRecorder statusRecorder,
       ApplicationState newState,
       Duration requeueAfter) {
-    statusRecorder.appendNewStateAndPersist(context, newState);
+    if (!statusRecorder.appendNewStateAndPersist(context, newState)) {
+      log.warn("Status is not persisted successfully, will retry in next 
reconcile attempt");
+      return completeAndImmediateRequeue();
+    }
     return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
   }
 
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
index 55d3544..897c46e 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
@@ -33,6 +33,7 @@ import 
org.apache.spark.k8s.operator.reconciler.observers.AppDriverRunningObserv
 import org.apache.spark.k8s.operator.spec.ExecutorInstanceConfig;
 import org.apache.spark.k8s.operator.status.ApplicationState;
 import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
 import org.apache.spark.k8s.operator.utils.PodUtils;
 import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
 
@@ -87,13 +88,13 @@ public class AppRunningStep extends AppReconcileStep {
       return observeDriver(
           context, statusRecorder, Collections.singletonList(new 
AppDriverRunningObserver()));
     } else {
-      statusRecorder.persistStatus(
-          context,
+      ApplicationStatus updatedStatus =
           context
               .getResource()
               .getStatus()
-              .appendNewState(new ApplicationState(proposedStateSummary, 
stateMessage)));
-      return completeAndDefaultRequeue();
+              .appendNewState(new ApplicationState(proposedStateSummary, 
stateMessage));
+      return attemptStatusUpdate(
+          context, statusRecorder, updatedStatus, completeAndDefaultRequeue());
     }
   }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
index 09cf195..d103efd 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java
@@ -46,7 +46,10 @@ public class AppUnknownStateStep extends AppReconcileStep {
         new ApplicationState(ApplicationStateSummary.Failed, 
Constants.UNKNOWN_STATE_MESSAGE);
     Optional<Pod> driver = context.getDriverPod();
     driver.ifPresent(pod -> 
state.setLastObservedDriverStatus(pod.getStatus()));
-    statusRecorder.persistStatus(context, 
context.getResource().getStatus().appendNewState(state));
-    return ReconcileProgress.completeAndImmediateRequeue();
+    return attemptStatusUpdate(
+        context,
+        statusRecorder,
+        context.getResource().getStatus().appendNewState(state),
+        ReconcileProgress.completeAndImmediateRequeue());
   }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
index f3f5e17..174328d 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java
@@ -48,14 +48,16 @@ public class AppValidateStep extends AppReconcileStep {
       SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
     if (!isValidApplicationStatus(context.getResource())) {
       log.warn("Spark application found with empty status. Resetting to 
initial state.");
-      statusRecorder.persistStatus(context, new ApplicationStatus());
+      return attemptStatusUpdate(context, statusRecorder, new 
ApplicationStatus(), proceed());
     }
     if (ClientMode.equals(context.getResource().getSpec())) {
       ApplicationState failure =
           new ApplicationState(ApplicationStateSummary.Failed, "Client mode is 
not supported yet.");
-      statusRecorder.persistStatus(
-          context, context.getResource().getStatus().appendNewState(failure));
-      return completeAndImmediateRequeue();
+      return attemptStatusUpdate(
+          context,
+          statusRecorder,
+          context.getResource().getStatus().appendNewState(failure),
+          completeAndImmediateRequeue());
     }
     return proceed();
   }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
index 2123aaf..16ddfe1 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
@@ -51,11 +51,15 @@ public class SparkAppStatusRecorder
    *
    * @param context The SparkAppContext for the application.
    * @param newState The new ApplicationState to append.
+   * @return true if the status is successfully patched
    */
-  public void appendNewStateAndPersist(SparkAppContext context, 
ApplicationState newState) {
+  public boolean appendNewStateAndPersist(SparkAppContext context, 
ApplicationState newState) {
     ApplicationStatus appStatus = context.getResource().getStatus();
-    recorderSource.recordStatusUpdateLatency(appStatus, newState);
     ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
-    persistStatus(context, updatedStatus);
+    boolean statusPersisted = persistStatus(context, updatedStatus);
+    if (statusPersisted) {
+      recorderSource.recordStatusUpdateLatency(appStatus, newState);
+    }
+    return statusPersisted;
   }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
index f3f8f75..b41cf99 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java
@@ -127,10 +127,17 @@ public class StatusRecorder<
    *
    * @param context The BaseContext containing the resource and client.
    * @param newStatus The new status to persist.
+   * @return true if the status is successfully patched.
    */
-  public void persistStatus(BaseContext<CR> context, STATUS newStatus) {
-    context.getResource().setStatus(newStatus);
-    patchAndStatusWithVersionLocked(context.getResource(), 
context.getClient());
+  public boolean persistStatus(BaseContext<CR> context, STATUS newStatus) {
+    try {
+      context.getResource().setStatus(newStatus);
+      patchAndStatusWithVersionLocked(context.getResource(), 
context.getClient());
+      return true;
+    } catch (KubernetesClientException e) {
+      log.error("Error while persisting status to {}", newStatus, e);
+      return false;
+    }
   }
 
   /**
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
index 1fbf563..1ff19fd 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java
@@ -21,6 +21,7 @@ package 
org.apache.spark.k8s.operator.reconciler.reconcilesteps;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -171,6 +172,8 @@ class AppCleanUpStepTest {
         when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
         
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
         
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+        when(mockRecorder.persistStatus(eq(mockAppContext), 
any())).thenReturn(true);
+        when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext), 
any())).thenReturn(true);
 
         try (MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
           ReconcileProgress progress = 
cleanUpWithReason.reconcile(mockAppContext, mockRecorder);
@@ -259,6 +262,8 @@ class AppCleanUpStepTest {
     when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
     
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
     
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+    when(mockRecorder.persistStatus(eq(mockAppContext), 
any())).thenReturn(true);
+    when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext), 
any())).thenReturn(true);
 
     try (MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
       ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, 
mockRecorder);
@@ -317,6 +322,8 @@ class AppCleanUpStepTest {
     when(mockAppContext2.getDriverPreResourcesSpec())
         .thenReturn(Collections.singletonList(resource1));
     
when(mockAppContext2.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2));
+    when(mockRecorder.persistStatus(any(), any())).thenReturn(true);
+    when(mockRecorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
 
     try (MockedStatic<ReconcilerUtils> utils = 
Mockito.mockStatic(ReconcilerUtils.class)) {
       ReconcileProgress progress1 = 
cleanUpWithReason.reconcile(mockAppContext1, mockRecorder);
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
index 6495b71..0131025 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@ import org.mockito.ArgumentCaptor;
 
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.context.SparkAppContext;
+import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
 import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
 
 @EnableKubernetesMockClient(crud = true)
@@ -106,7 +108,10 @@ class AppInitStepTest {
     when(mocksparkAppContext.getDriverResourcesSpec())
         .thenReturn(Collections.singletonList(resourceConfigMapSpec));
     when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
-    appInitStep.reconcile(mocksparkAppContext, recorder);
+    when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
+    when(recorder.persistStatus(any(), any())).thenReturn(true);
+    ReconcileProgress reconcileProgress = 
appInitStep.reconcile(mocksparkAppContext, recorder);
+    Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), 
reconcileProgress);
     Pod createdPod = 
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
     ConfigMap createCM =
         
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
@@ -135,6 +140,8 @@ class AppInitStepTest {
         .thenReturn(Collections.singletonList(preResourceConfigMapSpec));
     when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
     
when(mocksparkAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
+    when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
+    when(recorder.persistStatus(any(), any())).thenReturn(true);
 
     KubernetesClient mockClient = mock(KubernetesClient.class);
     when(mocksparkAppContext.getClient()).thenReturn(mockClient);
@@ -166,8 +173,9 @@ class AppInitStepTest {
     when(mockClient.resourceList(anyList())).thenReturn(mockList);
     when(mockList.forceConflicts()).thenReturn(mockServerSideApplicable);
 
-    appInitStep.reconcile(mocksparkAppContext, recorder);
+    ReconcileProgress reconcileProgress = 
appInitStep.reconcile(mocksparkAppContext, recorder);
 
+    Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), 
reconcileProgress);
     ArgumentCaptor<List<ConfigMap>> argument = 
ArgumentCaptor.forClass(List.class);
     verify(mockClient).resourceList(argument.capture());
     Assertions.assertEquals(1, argument.getValue().size());
@@ -184,4 +192,39 @@ class AppInitStepTest {
         
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getKind());
     
Assertions.assertTrue(decoratedConfigMap.getMetadata().getManagedFields().isEmpty());
   }
+
+  @Test
+  void appInitStepShouldBeIdempotentWhenStatusUpdateFails() {
+    AppInitStep appInitStep = new AppInitStep();
+    SparkAppContext mocksparkAppContext = mock(SparkAppContext.class);
+    SparkAppStatusRecorder recorder = mock(SparkAppStatusRecorder.class);
+    SparkApplication application = new SparkApplication();
+    application.setMetadata(applicationMetadata);
+    when(mocksparkAppContext.getResource()).thenReturn(application);
+    
when(mocksparkAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
+    when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
+    when(mocksparkAppContext.getDriverResourcesSpec())
+        .thenReturn(Collections.singletonList(resourceConfigMapSpec));
+    when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
+    when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(false, 
true);
+    when(recorder.persistStatus(any(), any())).thenReturn(false, true);
+
+    // If the first reconcile manages to create everything but fails to update 
status
+    ReconcileProgress reconcileProgress1 = 
appInitStep.reconcile(mocksparkAppContext, recorder);
+    Assertions.assertEquals(ReconcileProgress.completeAndImmediateRequeue(), 
reconcileProgress1);
+    Pod createdPod = 
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
+    ConfigMap createCM =
+        
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
+    Assertions.assertNotNull(createCM);
+    Assertions.assertNotNull(createdPod);
+
+    // The second reconcile shall update the status without re-creating 
everything
+    ReconcileProgress reconcileProgress2 = 
appInitStep.reconcile(mocksparkAppContext, recorder);
+    Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), 
reconcileProgress2);
+    createdPod = 
kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
+    createCM =
+        
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
+    Assertions.assertNotNull(createCM);
+    Assertions.assertNotNull(createdPod);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to