This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0b732b5 [SPARK-49376] Introduce ClusterToleration and
WorkerInstanceConfig
0b732b5 is described below
commit 0b732b54ab6114bfb45617254357c49359d55ccc
Author: zhou-jiang <[email protected]>
AuthorDate: Fri Aug 23 22:41:06 2024 -0700
[SPARK-49376] Introduce ClusterToleration and WorkerInstanceConfig
### What changes were proposed in this pull request?
This PR proposes `ClusterTolerations` and relocate initWorkers / minWorkers
/ maxWorkers from `ClusterSpec` top level to `WorkerInstanceConfig` .
### Why are the changes needed?
This would align the API schema used by app / cluster. `ClusterTolerations`
can also serves timeout config for clusters in future commits
### Does this PR introduce _any_ user-facing change?
No (not released yet)
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
Pass unit test / integration test CIs
Closes #98 from jiangzho/cluster_tolerations.
Lead-authored-by: zhou-jiang <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
examples/cluster-on-yunikorn.yaml | 8 +++++---
examples/prod-cluster-with-three-workers.yaml | 8 +++++---
examples/qa-cluster-with-one-worker.yaml | 8 +++++---
.../org/apache/spark/k8s/operator/spec/ClusterSpec.java | 6 +++---
.../spec/{ClusterSpec.java => ClusterTolerations.java} | 14 ++++----------
.../spec/{ClusterSpec.java => WorkerInstanceConfig.java} | 14 ++++----------
.../apache/spark/k8s/operator/spec/ClusterSpecTest.java | 6 +++---
.../spark/k8s/operator/SparkClusterResourceSpec.java | 7 ++++++-
.../spark/k8s/operator/SparkClusterResourceSpecTest.java | 3 +++
.../k8s/operator/SparkClusterSubmissionWorkerTest.java | 3 +++
10 files changed, 41 insertions(+), 36 deletions(-)
diff --git a/examples/cluster-on-yunikorn.yaml
b/examples/cluster-on-yunikorn.yaml
index 9f1523d..0032c84 100644
--- a/examples/cluster-on-yunikorn.yaml
+++ b/examples/cluster-on-yunikorn.yaml
@@ -19,9 +19,11 @@ metadata:
spec:
runtimeVersions:
sparkVersion: "4.0.0-preview1"
- initWorkers: 1
- minWorkers: 1
- maxWorkers: 1
+ clusterTolerations:
+ instanceConfig:
+ initWorkers: 1
+ minWorkers: 1
+ maxWorkers: 1
sparkConf:
spark.kubernetes.container.image: "spark:4.0.0-preview1"
spark.kubernetes.scheduler.name: "yunikorn"
diff --git a/examples/prod-cluster-with-three-workers.yaml
b/examples/prod-cluster-with-three-workers.yaml
index 0fcbb36..d685f43 100644
--- a/examples/prod-cluster-with-three-workers.yaml
+++ b/examples/prod-cluster-with-three-workers.yaml
@@ -19,9 +19,11 @@ metadata:
spec:
runtimeVersions:
sparkVersion: "4.0.0-preview1"
- initWorkers: 3
- minWorkers: 3
- maxWorkers: 3
+ clusterTolerations:
+ instanceConfig:
+ initWorkers: 3
+ minWorkers: 3
+ maxWorkers: 3
sparkConf:
spark.kubernetes.container.image: "spark:4.0.0-preview1"
spark.master.ui.title: "Prod Spark Cluster"
diff --git a/examples/qa-cluster-with-one-worker.yaml
b/examples/qa-cluster-with-one-worker.yaml
index 7ec52e7..131808a 100644
--- a/examples/qa-cluster-with-one-worker.yaml
+++ b/examples/qa-cluster-with-one-worker.yaml
@@ -19,9 +19,11 @@ metadata:
spec:
runtimeVersions:
sparkVersion: "4.0.0-preview1"
- initWorkers: 1
- minWorkers: 1
- maxWorkers: 1
+ clusterTolerations:
+ instanceConfig:
+ initWorkers: 1
+ minWorkers: 1
+ maxWorkers: 1
sparkConf:
spark.kubernetes.container.image: "spark:4.0.0-preview1"
spark.master.ui.title: "QA Spark Cluster"
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
index ca6a6eb..b7e9fa8 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
@@ -37,7 +37,7 @@ import lombok.NoArgsConstructor;
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterSpec extends BaseSpec {
@Required protected RuntimeVersions runtimeVersions;
- @Required protected int initWorkers;
- @Required protected int minWorkers;
- @Required protected int maxWorkers;
+
+ @Required @Builder.Default
+ protected ClusterTolerations clusterTolerations = new ClusterTolerations();
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
similarity index 74%
copy from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
index ca6a6eb..d7e546b 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterTolerations.java
@@ -16,28 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.spark.k8s.operator.spec;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.generator.annotation.Required;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
-@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
- @Required protected RuntimeVersions runtimeVersions;
- @Required protected int initWorkers;
- @Required protected int minWorkers;
- @Required protected int maxWorkers;
+public class ClusterTolerations {
+ /** Determine the toleration behavior for worker instances. */
+ @Required @Builder.Default
+ protected WorkerInstanceConfig instanceConfig = new WorkerInstanceConfig();
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
similarity index 74%
copy from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
index ca6a6eb..5adca3e 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerInstanceConfig.java
@@ -16,28 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.spark.k8s.operator.spec;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.generator.annotation.Required;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
-@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
- @Required protected RuntimeVersions runtimeVersions;
- @Required protected int initWorkers;
- @Required protected int minWorkers;
- @Required protected int maxWorkers;
+public class WorkerInstanceConfig {
+ @Required @Builder.Default protected int initWorkers = 0;
+ @Required @Builder.Default protected int minWorkers = 0;
+ @Required @Builder.Default protected int maxWorkers = 0;
}
diff --git
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
index 02b9625..2801860 100644
---
a/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
+++
b/spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/ClusterSpecTest.java
@@ -36,8 +36,8 @@ class ClusterSpecTest {
void testInitSpecWithDefaults() {
ClusterSpec spec1 = new ClusterSpec();
assertNull(spec1.runtimeVersions);
- assertEquals(0, spec1.initWorkers);
- assertEquals(0, spec1.minWorkers);
- assertEquals(0, spec1.maxWorkers);
+ assertEquals(0, spec1.clusterTolerations.instanceConfig.initWorkers);
+ assertEquals(0, spec1.clusterTolerations.instanceConfig.minWorkers);
+ assertEquals(0, spec1.clusterTolerations.instanceConfig.maxWorkers);
}
}
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index 23265b0..d5c0324 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -59,7 +59,12 @@ public class SparkClusterResourceSpec {
buildMasterStatefulSet(scheduler, clusterName, namespace, image,
options.toString());
workerStatefulSet =
buildWorkerStatefulSet(
- scheduler, clusterName, namespace, image, spec.getInitWorkers(),
options.toString());
+ scheduler,
+ clusterName,
+ namespace,
+ image,
+ spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
+ options.toString());
}
private static Service buildMasterService(String name, String namespace) {
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 7d0a2ed..ef1cbaf 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -31,12 +31,14 @@ import org.junit.jupiter.api.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.ClusterTolerations;
class SparkClusterResourceSpecTest {
SparkCluster cluster;
ObjectMeta objectMeta;
ClusterSpec clusterSpec;
SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace",
"other-namespace");
+ ClusterTolerations clusterTolerations = new ClusterTolerations();
@BeforeEach
void setUp() {
@@ -47,6 +49,7 @@ class SparkClusterResourceSpecTest {
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
when(objectMeta.getName()).thenReturn("cluster-name");
+ when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
}
@Test
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 7f4a972..b0d02c4 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -29,11 +29,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.ClusterTolerations;
class SparkClusterSubmissionWorkerTest {
SparkCluster cluster;
ObjectMeta objectMeta;
ClusterSpec clusterSpec;
+ ClusterTolerations clusterTolerations = new ClusterTolerations();
@BeforeEach
void setUp() {
@@ -44,6 +46,7 @@ class SparkClusterSubmissionWorkerTest {
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
when(objectMeta.getName()).thenReturn("cluster-name");
+ when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]