This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0b72793 [SPARK-55611] Add `RunningWithPartialCapacity` state for
`SparkApplication` CRD
0b72793 is described below
commit 0b72793a59de3ccff2b09fe82e2cd1b7acf1436d
Author: Zhou JIANG <[email protected]>
AuthorDate: Wed Feb 25 03:28:43 2026 -0800
[SPARK-55611] Add `RunningWithPartialCapacity` state for `SparkApplication`
CRD
### What changes were proposed in this pull request?
This PR adds a new `RunningWithPartialCapacity` state to
`ApplicationStateSummary` that indicates when a Spark application with dynamic
allocation disabled is running with executor count between minimum and maximum
thresholds.
### Why are the changes needed?
When dynamic allocation is disabled, if an application has fewer executors
than requested (between min and max), it indicates resource constraints in the
cluster. This is a tolerable but suboptimal state that should be visible to
operators.
With dynamic allocation enabled, executors scaling between min and max is
normal behavior, so `RunningHealthy` is appropriate.
The new state provides better observability into cluster resource
constraints affecting application performance.
### Does this PR introduce _any_ user-facing change?
Yes. Users will now see `RunningWithPartialCapacity` status for
applications.
### How was this patch tested?
Added unit tests in `AppRunningStepTest` covering state transition with
different number of executors. Also added unit tests in
`ApplicationStateSummaryTest` verifying the new state summary.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #513 from jiangzho/partial.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sparkapplications.spark.apache.org-v1.yaml | 4 +
docs/architecture.md | 9 +
.../org/apache/spark/k8s/operator/Constants.java | 7 +
.../operator/status/ApplicationStateSummary.java | 7 +
.../status/ApplicationStateSummaryTest.java | 160 ++++++++
.../operator/reconciler/SparkAppReconciler.java | 1 +
.../reconciler/reconcilesteps/AppRunningStep.java | 37 +-
.../reconcilesteps/AppRunningStepTest.java | 445 +++++++++++++++++++++
8 files changed, 666 insertions(+), 4 deletions(-)
diff --git
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
index 75da4a7..430347b 100644
---
a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
+++
b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml
@@ -23954,6 +23954,7 @@ spec:
- ResourceReleased
- RunningHealthy
- RunningWithBelowThresholdExecutors
+ - RunningWithPartialCapacity
- ScheduledToRestart
- SchedulingFailure
- Submitted
@@ -24547,6 +24548,7 @@ spec:
- ResourceReleased
- RunningHealthy
- RunningWithBelowThresholdExecutors
+ - RunningWithPartialCapacity
- ScheduledToRestart
- SchedulingFailure
- Submitted
@@ -25148,6 +25150,7 @@ spec:
- ResourceReleased
- RunningHealthy
- RunningWithBelowThresholdExecutors
+ - RunningWithPartialCapacity
- ScheduledToRestart
- SchedulingFailure
- Submitted
@@ -25742,6 +25745,7 @@ spec:
- ResourceReleased
- RunningHealthy
- RunningWithBelowThresholdExecutors
+ - RunningWithPartialCapacity
- ScheduledToRestart
- SchedulingFailure
- Submitted
diff --git a/docs/architecture.md b/docs/architecture.md
index e359c1f..6c0d027 100644
--- a/docs/architecture.md
+++ b/docs/architecture.md
@@ -67,19 +67,28 @@ stateDiagram-v2
DriverReady --> RunningHealthy
DriverReady --> InitializedBelowThresholdExecutors
+ DriverReady --> RunningWithPartialCapacity
DriverReady --> ExecutorsStartTimedOut
DriverReady --> DriverEvicted
InitializedBelowThresholdExecutors --> RunningHealthy
+ InitializedBelowThresholdExecutors --> RunningWithPartialCapacity
InitializedBelowThresholdExecutors --> Failed
RunningHealthy --> Succeeded
RunningHealthy --> RunningWithBelowThresholdExecutors
+ RunningHealthy --> RunningWithPartialCapacity
RunningHealthy --> Failed
+ RunningWithBelowThresholdExecutors --> RunningWithPartialCapacity
RunningWithBelowThresholdExecutors --> RunningHealthy
RunningWithBelowThresholdExecutors --> Failed
+ RunningWithPartialCapacity --> RunningWithBelowThresholdExecutors
+ RunningWithPartialCapacity --> RunningHealthy
+ RunningWithPartialCapacity --> Succeeded
+ RunningWithPartialCapacity --> Failed
+
state Failures {
SchedulingFailure
DriverStartTimedOut
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index fb7f617..8ce7ae7 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -164,6 +164,13 @@ public class Constants {
/** Message indicating that the application is running healthy. */
public static final String RUNNING_HEALTHY_MESSAGE = "Application is running
healthy.";
+ /**
+ * Message indicating that the application is running with partial executor
capacity (between min
+ * and max, exclusive, with dynamic allocation disabled).
+ */
+ public static final String RUNNING_WITH_PARTIAL_CAPACITY_MESSAGE =
+ "Application is running with executor count between minimum and maximum
capacity, exclusive.";
+
/**
* Message indicating that the application is running with less than the
minimal number of
* requested initial executors.
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java
index bd5c2b9..e526b98 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java
@@ -57,6 +57,13 @@ public enum ApplicationStateSummary implements
BaseStateSummary {
*/
RunningHealthy,
+ /**
+ * Application is running with dynamic allocation disabled and executor
count is between minimum
+ * and maximum thresholds, exclusive. The application is in a tolerable
state but not at optimal
+ * capacity.
+ */
+ RunningWithPartialCapacity,
+
/** The application has lost a fraction of executors for external reasons */
RunningWithBelowThresholdExecutors,
diff --git
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStateSummaryTest.java
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStateSummaryTest.java
new file mode 100644
index 0000000..0a1e0f7
--- /dev/null
+++
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/status/ApplicationStateSummaryTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.status;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class ApplicationStateSummaryTest {
+
+ @Test
+ void testIsInitializing() {
+ assertTrue(ApplicationStateSummary.Submitted.isInitializing());
+ assertTrue(ApplicationStateSummary.ScheduledToRestart.isInitializing());
+
+ assertFalse(ApplicationStateSummary.DriverRequested.isInitializing());
+ assertFalse(ApplicationStateSummary.RunningHealthy.isInitializing());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isInitializing());
+ assertFalse(ApplicationStateSummary.Failed.isInitializing());
+ }
+
+ @Test
+ void testIsStarting() {
+ // States after ScheduledToRestart but before RunningHealthy
+ assertTrue(ApplicationStateSummary.DriverRequested.isStarting());
+ assertTrue(ApplicationStateSummary.DriverStarted.isStarting());
+ assertTrue(ApplicationStateSummary.DriverReady.isStarting());
+
assertTrue(ApplicationStateSummary.InitializedBelowThresholdExecutors.isStarting());
+
+ // States at or after RunningHealthy are not starting
+ assertFalse(ApplicationStateSummary.RunningHealthy.isStarting());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isStarting());
+
assertFalse(ApplicationStateSummary.RunningWithBelowThresholdExecutors.isStarting());
+ assertFalse(ApplicationStateSummary.Failed.isStarting());
+
+ // States before ScheduledToRestart are not starting
+ assertFalse(ApplicationStateSummary.Submitted.isStarting());
+ assertFalse(ApplicationStateSummary.ScheduledToRestart.isStarting());
+ }
+
+ @Test
+ void testIsStopping() {
+ // States after RunningWithBelowThresholdExecutors but not terminated
+ assertTrue(ApplicationStateSummary.DriverStartTimedOut.isStopping());
+ assertTrue(ApplicationStateSummary.ExecutorsStartTimedOut.isStopping());
+ assertTrue(ApplicationStateSummary.DriverReadyTimedOut.isStopping());
+ assertTrue(ApplicationStateSummary.Succeeded.isStopping());
+ assertTrue(ApplicationStateSummary.Failed.isStopping());
+ assertTrue(ApplicationStateSummary.SchedulingFailure.isStopping());
+ assertTrue(ApplicationStateSummary.DriverEvicted.isStopping());
+
+ // Running states are not stopping
+ assertFalse(ApplicationStateSummary.RunningHealthy.isStopping());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isStopping());
+
assertFalse(ApplicationStateSummary.RunningWithBelowThresholdExecutors.isStopping());
+
+ // Terminated states are not considered "stopping"
+ assertFalse(ApplicationStateSummary.ResourceReleased.isStopping());
+
assertFalse(ApplicationStateSummary.TerminatedWithoutReleaseResources.isStopping());
+
+ // Earlier states are not stopping
+ assertFalse(ApplicationStateSummary.Submitted.isStopping());
+ assertFalse(ApplicationStateSummary.DriverRequested.isStopping());
+ }
+
+ @Test
+ void testIsTerminated() {
+ assertTrue(ApplicationStateSummary.ResourceReleased.isTerminated());
+
assertTrue(ApplicationStateSummary.TerminatedWithoutReleaseResources.isTerminated());
+
+ assertFalse(ApplicationStateSummary.Submitted.isTerminated());
+ assertFalse(ApplicationStateSummary.RunningHealthy.isTerminated());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isTerminated());
+ assertFalse(ApplicationStateSummary.Failed.isTerminated());
+ assertFalse(ApplicationStateSummary.Succeeded.isTerminated());
+ }
+
+ @Test
+ void testIsFailure() {
+ assertTrue(ApplicationStateSummary.DriverStartTimedOut.isFailure());
+ assertTrue(ApplicationStateSummary.ExecutorsStartTimedOut.isFailure());
+ assertTrue(ApplicationStateSummary.SchedulingFailure.isFailure());
+ assertTrue(ApplicationStateSummary.DriverEvicted.isFailure());
+ assertTrue(ApplicationStateSummary.Failed.isFailure());
+ assertTrue(ApplicationStateSummary.DriverReadyTimedOut.isFailure());
+
+ assertFalse(ApplicationStateSummary.Submitted.isFailure());
+ assertFalse(ApplicationStateSummary.RunningHealthy.isFailure());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isFailure());
+
assertFalse(ApplicationStateSummary.RunningWithBelowThresholdExecutors.isFailure());
+ assertFalse(ApplicationStateSummary.Succeeded.isFailure());
+ assertFalse(ApplicationStateSummary.ResourceReleased.isFailure());
+ }
+
+ @Test
+ void testIsInfrastructureFailure() {
+
assertTrue(ApplicationStateSummary.DriverStartTimedOut.isInfrastructureFailure());
+
assertTrue(ApplicationStateSummary.ExecutorsStartTimedOut.isInfrastructureFailure());
+
assertTrue(ApplicationStateSummary.SchedulingFailure.isInfrastructureFailure());
+
+ assertFalse(ApplicationStateSummary.Failed.isInfrastructureFailure());
+
assertFalse(ApplicationStateSummary.DriverEvicted.isInfrastructureFailure());
+
assertFalse(ApplicationStateSummary.RunningHealthy.isInfrastructureFailure());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isInfrastructureFailure());
+ }
+
+ @Test
+ void testRunningWithPartialCapacityOrdinalPosition() {
+ // Verify RunningWithPartialCapacity is positioned correctly
+ // It should be after RunningHealthy and before
RunningWithBelowThresholdExecutors
+ assertTrue(
+ ApplicationStateSummary.RunningWithPartialCapacity.ordinal()
+ > ApplicationStateSummary.RunningHealthy.ordinal());
+ assertTrue(
+ ApplicationStateSummary.RunningWithPartialCapacity.ordinal()
+ <
ApplicationStateSummary.RunningWithBelowThresholdExecutors.ordinal());
+
+ // Verify it's in the right category for helper methods
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isStarting());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isStopping());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isTerminated());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isFailure());
+
assertFalse(ApplicationStateSummary.RunningWithPartialCapacity.isInfrastructureFailure());
+ }
+
+ @Test
+ void testRunningStatesOrder() {
+ // Verify the correct ordinal order for running states
+ assertTrue(
+ ApplicationStateSummary.InitializedBelowThresholdExecutors.ordinal()
+ < ApplicationStateSummary.RunningHealthy.ordinal());
+ assertTrue(
+ ApplicationStateSummary.RunningHealthy.ordinal()
+ < ApplicationStateSummary.RunningWithPartialCapacity.ordinal());
+ assertTrue(
+ ApplicationStateSummary.RunningWithPartialCapacity.ordinal()
+ <
ApplicationStateSummary.RunningWithBelowThresholdExecutors.ordinal());
+ assertTrue(
+ ApplicationStateSummary.RunningWithBelowThresholdExecutors.ordinal()
+ < ApplicationStateSummary.DriverStartTimedOut.ordinal());
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
index 92c99c4..f0b270a 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
@@ -183,6 +183,7 @@ public class SparkAppReconciler implements
Reconciler<SparkApplication>, Cleaner
case DriverReady,
InitializedBelowThresholdExecutors,
RunningHealthy,
+ RunningWithPartialCapacity,
RunningWithBelowThresholdExecutors -> {
steps.add(new AppRunningStep());
steps.add(new AppResourceObserveStep(List.of(new
AppDriverRunningObserver())));
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
index bab1499..a9ce545 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java
@@ -65,8 +65,15 @@ public class AppRunningStep extends AppReconcileStep {
long runningExecutors =
executors.stream().filter(PodUtils::isPodReady).count();
if (prevStateSummary.isStarting()) {
if (runningExecutors >= executorInstanceConfig.getInitExecutors()) {
- proposedStateSummary = ApplicationStateSummary.RunningHealthy;
- stateMessage = RUNNING_HEALTHY_MESSAGE;
+ if (!isDynamicAllocationEnabled(context)
+ && executorInstanceConfig.getMaxExecutors() > 0
+ && runningExecutors < executorInstanceConfig.getMaxExecutors()) {
+ proposedStateSummary =
ApplicationStateSummary.RunningWithPartialCapacity;
+ stateMessage = RUNNING_WITH_PARTIAL_CAPACITY_MESSAGE;
+ } else {
+ proposedStateSummary = ApplicationStateSummary.RunningHealthy;
+ stateMessage = RUNNING_HEALTHY_MESSAGE;
+ }
} else if (runningExecutors > 0L) {
proposedStateSummary =
ApplicationStateSummary.InitializedBelowThresholdExecutors;
stateMessage = INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
@@ -76,8 +83,15 @@ public class AppRunningStep extends AppReconcileStep {
}
} else {
if (runningExecutors >= executorInstanceConfig.getMinExecutors()) {
- proposedStateSummary = ApplicationStateSummary.RunningHealthy;
- stateMessage = RUNNING_HEALTHY_MESSAGE;
+ if (!isDynamicAllocationEnabled(context)
+ && executorInstanceConfig.getMaxExecutors() > 0
+ && runningExecutors < executorInstanceConfig.getMaxExecutors()) {
+ proposedStateSummary =
ApplicationStateSummary.RunningWithPartialCapacity;
+ stateMessage = RUNNING_WITH_PARTIAL_CAPACITY_MESSAGE;
+ } else {
+ proposedStateSummary = ApplicationStateSummary.RunningHealthy;
+ stateMessage = RUNNING_HEALTHY_MESSAGE;
+ }
} else {
proposedStateSummary =
ApplicationStateSummary.RunningWithBelowThresholdExecutors;
stateMessage = RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
@@ -96,4 +110,19 @@ public class AppRunningStep extends AppReconcileStep {
context, statusRecorder, updatedStatus, completeAndDefaultRequeue());
}
}
+
+ /**
+ * Checks if dynamic allocation is enabled for the application.
+ *
+ * @param context The SparkAppContext
+ * @return true if dynamic allocation is enabled, false otherwise
+ */
+ private boolean isDynamicAllocationEnabled(SparkAppContext context) {
+ return "true".equalsIgnoreCase(
+ context
+ .getResource()
+ .getSpec()
+ .getSparkConf()
+ .getOrDefault("spark.dynamicAllocation.enabled", "false"));
+ }
}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStepTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStepTest.java
new file mode 100644
index 0000000..2d25f69
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStepTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.context.SparkAppContext;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.ApplicationTolerations;
+import org.apache.spark.k8s.operator.spec.ExecutorInstanceConfig;
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
+
+class AppRunningStepTest {
+ private SparkAppContext mockContext;
+ private SparkAppStatusRecorder mockRecorder;
+ private SparkApplication app;
+ private ApplicationSpec appSpec;
+ private ApplicationStatus appStatus;
+ private AppRunningStep appRunningStep;
+
+ @BeforeEach
+ void setUp() {
+ mockContext = mock(SparkAppContext.class);
+ mockRecorder = mock(SparkAppStatusRecorder.class);
+ app = new SparkApplication();
+ appSpec = new ApplicationSpec();
+ appStatus = new ApplicationStatus();
+ appRunningStep = new AppRunningStep();
+
+ app.setSpec(appSpec);
+ app.setStatus(appStatus);
+
+ when(mockContext.getResource()).thenReturn(app);
+
+ // Mock the recorder to actually update the app status when called
+ when(mockRecorder.persistStatus(any(SparkAppContext.class),
any(ApplicationStatus.class)))
+ .thenAnswer(
+ invocation -> {
+ ApplicationStatus newStatus = invocation.getArgument(1);
+ app.setStatus(newStatus);
+ return true;
+ });
+ }
+
+ private Pod createReadyExecutorPod(String name) {
+ return new PodBuilder()
+ .withNewMetadata()
+ .withName(name)
+ .endMetadata()
+ .withNewStatus()
+ .withPhase("Running")
+ .addNewCondition()
+ .withType("Ready")
+ .withStatus("True")
+ .endCondition()
+ .addNewContainerStatus()
+ .withName("executor")
+ .withReady(true)
+ .withNewState()
+ .withNewRunning()
+ .endRunning()
+ .endState()
+ .endContainerStatus()
+ .endStatus()
+ .build();
+ }
+
+ private Set<Pod> createExecutorPods(int count) {
+ Set<Pod> executors = new HashSet<>();
+ for (int k = 0; k < count; k++) {
+ executors.add(createReadyExecutorPod("executor-" + k));
+ }
+ return executors;
+ }
+
+ @Test
+ void runningWithPartialCapacityDynamicAllocationDisabled() {
+ // Dynamic allocation disabled, executors between min and max
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "false");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(2).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ // Current state: RunningHealthy with 5 executors (between min and max)
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.RunningHealthy,
"Previous state"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(5));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ }
+
+ @Test
+ void runningHealthyDynamicAllocationEnabled() {
+ // Dynamic allocation enabled, executors between min and max
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "true");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(2).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ // Current state: DriverReady with 5 executors
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(5));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ }
+
+ @Test
+ void runningHealthyExecutorsAtMaxCapacity() {
+ // Dynamic allocation disabled, executors at max capacity
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "false");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(2).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(10));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ }
+
+ @Test
+ void runningHealthyMaxExecutorsZero() {
+ // maxExecutors is 0 (special case, should always be healthy)
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "false");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(2).minExecutors(2).maxExecutors(0).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(5));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ }
+
+ @Test
+ void runningWithBelowThresholdExecutors() {
+ // Executors below minimum threshold
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "false");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(5).minExecutors(5).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.RunningHealthy,
"Previous state"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(3));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithBelowThresholdExecutors,
currentState);
+ }
+
+ @Test
+ void initializedBelowThresholdExecutorsDuringStartup() {
+ // During startup phase with fewer executors than initExecutors
+ Map<String, String> sparkConf = new HashMap<>();
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(5).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ // DriverReady is a starting state
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(3));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.InitializedBelowThresholdExecutors,
currentState);
+ }
+
+ @Test
+ void runningWithPartialCapacityDuringStartupWithEnoughExecutors() {
+ // During startup phase with enough executors
+ Map<String, String> sparkConf = new HashMap<>();
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(5).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(5));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ }
+
+ @Test
+ void runningHealthyNoExecutorConfig() {
+ // No executor instance config (should be healthy)
+ Map<String, String> sparkConf = new HashMap<>();
+ appSpec.setSparkConf(sparkConf);
+
+ ApplicationTolerations tolerations =
+ ApplicationTolerations.builder().instanceConfig(null).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(3));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ }
+
+ @Test
+ void runningHealthyInitExecutorsZero() {
+ // initExecutors is 0 (should be healthy)
+ Map<String, String> sparkConf = new HashMap<>();
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(0).minExecutors(0).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(3));
+
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ }
+
+ @Test
+ void runningWithPartialCapacityDoesNotAppendDuplicateStates() {
+ // Verify that executor count changes within partial capacity range don't
create duplicate
+ // states
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.dynamicAllocation.enabled", "false");
+ appSpec.setSparkConf(sparkConf);
+
+ ExecutorInstanceConfig instanceConfig =
+
ExecutorInstanceConfig.builder().initExecutors(2).minExecutors(2).maxExecutors(10).build();
+ ApplicationTolerations tolerations =
+
ApplicationTolerations.builder().instanceConfig(instanceConfig).build();
+ appSpec.setApplicationTolerations(tolerations);
+
+ // Start with DriverReady state
+ appStatus =
+ appStatus.appendNewState(
+ new ApplicationState(ApplicationStateSummary.DriverReady, "Driver
ready"));
+ app.setStatus(appStatus);
+
+ // First reconcile: 5 executors (between min and max) -> should transition
to
+ // RunningWithPartialCapacity
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(5));
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ ApplicationStateSummary currentState =
+ app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ int stateCountAfterFirst =
app.getStatus().getStateTransitionHistory().size();
+
+ // Mock driver pod for subsequent reconciles that don't change state
+ Pod mockDriverPod =
+ new PodBuilder()
+ .withNewMetadata()
+ .withName("driver")
+ .endMetadata()
+ .withNewStatus()
+ .withPhase("Running")
+ .addNewCondition()
+ .withType("Ready")
+ .withStatus("True")
+ .endCondition()
+ .endStatus()
+ .build();
+ when(mockContext.getDriverPod()).thenReturn(Optional.of(mockDriverPod));
+
+ // Second reconcile: 6 executors (still between min and max) -> should NOT
append new state
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(6));
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ currentState = app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ int stateCountAfterSecond =
app.getStatus().getStateTransitionHistory().size();
+ assertEquals(
+ stateCountAfterFirst,
+ stateCountAfterSecond,
+ "State count should not increase when state doesn't change");
+
+ // Third reconcile: 4 executors (still between min and max) -> should NOT
append new state
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(4));
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ currentState = app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ int stateCountAfterThird =
app.getStatus().getStateTransitionHistory().size();
+ assertEquals(
+ stateCountAfterFirst,
+ stateCountAfterThird,
+ "State count should not increase when state doesn't change");
+
+ // Fourth reconcile: 9 executors (still between min and max) -> should NOT
append new state
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(9));
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ currentState = app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningWithPartialCapacity,
currentState);
+ int stateCountAfterFourth =
app.getStatus().getStateTransitionHistory().size();
+ assertEquals(
+ stateCountAfterFirst,
+ stateCountAfterFourth,
+ "State count should not increase when state doesn't change");
+
+ // Fifth reconcile: 10 executors (at max) -> should transition to
RunningHealthy
+
when(mockContext.getExecutorsForApplication()).thenReturn(createExecutorPods(10));
+ appRunningStep.reconcile(mockContext, mockRecorder);
+
+ currentState = app.getStatus().getCurrentState().getCurrentStateSummary();
+ assertEquals(ApplicationStateSummary.RunningHealthy, currentState);
+ int stateCountAfterFifth =
app.getStatus().getStateTransitionHistory().size();
+ assertEquals(
+ stateCountAfterFirst + 1,
+ stateCountAfterFifth,
+ "State count should increase by 1 when transitioning to
RunningHealthy");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]