This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b732b5  [SPARK-49376] Introduce ClusterToleration and 
WorkerInstanceConfig
0b732b5 is described below

commit 0b732b54ab6114bfb45617254357c49359d55ccc
Author: zhou-jiang <[email protected]>
AuthorDate: Fri Aug 23 22:41:06 2024 -0700

    [SPARK-49376] Introduce ClusterToleration and WorkerInstanceConfig
    
    ### What changes were proposed in this pull request?
    
    This PR proposes `ClusterTolerations` and relocate initWorkers / minWorkers 
/ maxWorkers from `ClusterSpec` top level to `WorkerInstanceConfig` .
    
    ### Why are the changes needed?
    
    This would align the API schema used by app / cluster. `ClusterTolerations` 
can also serves timeout config for clusters in future commits
    
    ### Does this PR introduce _any_ user-facing change?
    
    No (not released yet)
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Pass unit test / integration test CIs
    
    Closes #98 from jiangzho/cluster_tolerations.
    
    Lead-authored-by: zhou-jiang <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 examples/cluster-on-yunikorn.yaml                          |  8 +++++---
 examples/prod-cluster-with-three-workers.yaml              |  8 +++++---
 examples/qa-cluster-with-one-worker.yaml                   |  8 +++++---
 .../org/apache/spark/k8s/operator/spec/ClusterSpec.java    |  6 +++---
 .../spec/{ClusterSpec.java => ClusterTolerations.java}     | 14 ++++----------
 .../spec/{ClusterSpec.java => WorkerInstanceConfig.java}   | 14 ++++----------
 .../apache/spark/k8s/operator/spec/ClusterSpecTest.java    |  6 +++---
 .../spark/k8s/operator/SparkClusterResourceSpec.java       |  7 ++++++-
 .../spark/k8s/operator/SparkClusterResourceSpecTest.java   |  3 +++
 .../k8s/operator/SparkClusterSubmissionWorkerTest.java     |  3 +++
 10 files changed, 41 insertions(+), 36 deletions(-)

diff --git a/examples/cluster-on-yunikorn.yaml 
b/examples/cluster-on-yunikorn.yaml
index 9f1523d..0032c84 100644
--- a/examples/cluster-on-yunikorn.yaml
+++ b/examples/cluster-on-yunikorn.yaml
@@ -19,9 +19,11 @@ metadata:
 spec:
   runtimeVersions:
     sparkVersion: "4.0.0-preview1"
-  initWorkers: 1
-  minWorkers: 1
-  maxWorkers: 1
+  clusterTolerations:
+    instanceConfig:
+      initWorkers: 1
+      minWorkers: 1
+      maxWorkers: 1
   sparkConf:
     spark.kubernetes.container.image: "spark:4.0.0-preview1"
     spark.kubernetes.scheduler.name: "yunikorn"
diff --git a/examples/prod-cluster-with-three-workers.yaml 
b/examples/prod-cluster-with-three-workers.yaml
index 0fcbb36..d685f43 100644
--- a/examples/prod-cluster-with-three-workers.yaml
+++ b/examples/prod-cluster-with-three-workers.yaml
@@ -19,9 +19,11 @@ metadata:
 spec:
   runtimeVersions:
     sparkVersion: "4.0.0-preview1"
-  initWorkers: 3
-  minWorkers: 3
-  maxWorkers: 3
+  clusterTolerations:
+    instanceConfig:
+      initWorkers: 3
+      minWorkers: 3
+      maxWorkers: 3
   sparkConf:
     spark.kubernetes.container.image: "spark:4.0.0-preview1"
     spark.master.ui.title: "Prod Spark Cluster"
diff --git a/examples/qa-cluster-with-one-worker.yaml 
b/examples/qa-cluster-with-one-worker.yaml
index 7ec52e7..131808a 100644
--- a/examples/qa-cluster-with-one-worker.yaml
+++ b/examples/qa-cluster-with-one-worker.yaml
@@ -19,9 +19,11 @@ metadata:
 spec:
   runtimeVersions:
     sparkVersion: "4.0.0-preview1"
-  initWorkers: 1
-  minWorkers: 1
-  maxWorkers: 1
+  clusterTolerations:
+    instanceConfig:
+      initWorkers: 1
+      minWorkers: 1
+      maxWorkers: 1
   sparkConf:
     spark.kubernetes.container.image: "spark:4.0.0-preview1"
     spark.master.ui.title: "QA Spark Cluster"
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
index ca6a6eb..b7e9fa8 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
@@ -37,7 +37,7 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class ClusterSpec extends BaseSpec {
   @Required protected RuntimeVersions runtimeVersions;
-  @Required protected int initWorkers;
-  @Required protected int minWorkers;
-  @Required protected int maxWorkers;
+
+  @Required @Builder.Default
+  protected ClusterTolerations clusterTolerations = new ClusterTolerations();
 }
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
similarity index 74%
copy from 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
index ca6a6eb..d7e546b 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
@@ -16,28 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.spark.k8s.operator.spec;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import io.fabric8.generator.annotation.Required;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-@EqualsAndHashCode(callSuper = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
-  @Required protected RuntimeVersions runtimeVersions;
-  @Required protected int initWorkers;
-  @Required protected int minWorkers;
-  @Required protected int maxWorkers;
+public class ClusterTolerations {
+  /** Determine the toleration behavior for worker instances. */
+  @Required @Builder.Default
+  protected WorkerInstanceConfig instanceConfig = new WorkerInstanceConfig();
 }
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
similarity index 74%
copy from 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
index ca6a6eb..5adca3e 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
@@ -16,28 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.spark.k8s.operator.spec;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import io.fabric8.generator.annotation.Required;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-@EqualsAndHashCode(callSuper = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
-  @Required protected RuntimeVersions runtimeVersions;
-  @Required protected int initWorkers;
-  @Required protected int minWorkers;
-  @Required protected int maxWorkers;
+public class WorkerInstanceConfig {
+  @Required @Builder.Default protected int initWorkers = 0;
+  @Required @Builder.Default protected int minWorkers = 0;
+  @Required @Builder.Default protected int maxWorkers = 0;
 }
diff --git 
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
index 02b9625..2801860 100644
--- 
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
+++ 
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
@@ -36,8 +36,8 @@ class ClusterSpecTest {
   void testInitSpecWithDefaults() {
     ClusterSpec spec1 = new ClusterSpec();
     assertNull(spec1.runtimeVersions);
-    assertEquals(0, spec1.initWorkers);
-    assertEquals(0, spec1.minWorkers);
-    assertEquals(0, spec1.maxWorkers);
+    assertEquals(0, spec1.clusterTolerations.instanceConfig.initWorkers);
+    assertEquals(0, spec1.clusterTolerations.instanceConfig.minWorkers);
+    assertEquals(0, spec1.clusterTolerations.instanceConfig.maxWorkers);
   }
 }
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index 23265b0..d5c0324 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -59,7 +59,12 @@ public class SparkClusterResourceSpec {
         buildMasterStatefulSet(scheduler, clusterName, namespace, image, 
options.toString());
     workerStatefulSet =
         buildWorkerStatefulSet(
-            scheduler, clusterName, namespace, image, spec.getInitWorkers(), 
options.toString());
+            scheduler,
+            clusterName,
+            namespace,
+            image,
+            spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
+            options.toString());
   }
 
   private static Service buildMasterService(String name, String namespace) {
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 7d0a2ed..ef1cbaf 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -31,12 +31,14 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.ClusterTolerations;
 
 class SparkClusterResourceSpecTest {
   SparkCluster cluster;
   ObjectMeta objectMeta;
   ClusterSpec clusterSpec;
   SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace", 
"other-namespace");
+  ClusterTolerations clusterTolerations = new ClusterTolerations();
 
   @BeforeEach
   void setUp() {
@@ -47,6 +49,7 @@ class SparkClusterResourceSpecTest {
     when(cluster.getSpec()).thenReturn(clusterSpec);
     when(objectMeta.getNamespace()).thenReturn("my-namespace");
     when(objectMeta.getName()).thenReturn("cluster-name");
+    when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
   }
 
   @Test
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 7f4a972..b0d02c4 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -29,11 +29,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.ClusterTolerations;
 
 class SparkClusterSubmissionWorkerTest {
   SparkCluster cluster;
   ObjectMeta objectMeta;
   ClusterSpec clusterSpec;
+  ClusterTolerations clusterTolerations = new ClusterTolerations();
 
   @BeforeEach
   void setUp() {
@@ -44,6 +46,7 @@ class SparkClusterSubmissionWorkerTest {
     when(cluster.getSpec()).thenReturn(clusterSpec);
     when(objectMeta.getNamespace()).thenReturn("my-namespace");
     when(objectMeta.getName()).thenReturn("cluster-name");
+    when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to