This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a2aa452 [SPARK-49329] Support user provided spec for SparkCluster
a2aa452 is described below
commit a2aa4521bc887b6aecc6c755f90b770ad4d6ee4f
Author: zhou-jiang <[email protected]>
AuthorDate: Sun Aug 25 14:25:13 2024 -0700
[SPARK-49329] Support user provided spec for SparkCluster
### What changes were proposed in this pull request?
This PR introduces the feature to enable user-provided metadata & spec for
Spark Clusters.
### Why are the changes needed?
Similar to pod template spec support for Apps, this is desired when user
would like to introduce customization for Cluster master + worker spec.
### Does this PR introduce _any_ user-facing change?
No - not released yet
### How was this patch tested?
Unit tests and integration test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #80 from jiangzho/cluster_api.
Authored-by: zhou-jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/k8s/operator/spec/ClusterSpec.java | 3 +
.../spec/{ClusterSpec.java => MasterSpec.java} | 18 ++-
.../spec/{ClusterSpec.java => WorkerSpec.java} | 18 ++-
.../k8s/operator/SparkClusterResourceSpec.java | 84 +++++++++----
.../k8s/operator/SparkClusterResourceSpecTest.java | 132 +++++++++++++++++++++
.../operator/SparkClusterSubmissionWorkerTest.java | 8 ++
6 files changed, 219 insertions(+), 44 deletions(-)
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
index b7e9fa8..6786abb 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
@@ -40,4 +40,7 @@ public class ClusterSpec extends BaseSpec {
@Required @Builder.Default
protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+
+ @Builder.Default protected MasterSpec masterSpec = new
MasterSpec.MasterSpecBuilder().build();
+ @Builder.Default protected WorkerSpec workerSpec = new
WorkerSpec.WorkerSpecBuilder().build();
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
similarity index 72%
copy from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
index b7e9fa8..7becfc0 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
@@ -19,25 +19,23 @@
package org.apache.spark.k8s.operator.spec;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.generator.annotation.Required;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
-@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
- @Required protected RuntimeVersions runtimeVersions;
-
- @Required @Builder.Default
- protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+public class MasterSpec {
+ protected StatefulSetSpec masterStatefulSetSpec;
+ protected ObjectMeta masterStatefulSetMetadata;
+ protected ServiceSpec masterServiceSpec;
+ protected ObjectMeta masterServiceMetadata;
}
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
similarity index 72%
copy from
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
index b7e9fa8..2c5beb1 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
@@ -19,25 +19,23 @@
package org.apache.spark.k8s.operator.spec;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.generator.annotation.Required;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
-@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
- @Required protected RuntimeVersions runtimeVersions;
-
- @Required @Builder.Default
- protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+public class WorkerSpec {
+ protected StatefulSetSpec workerStatefulSetSpec;
+ protected ObjectMeta workerStatefulSetMetadata;
+ protected ServiceSpec workerServiceSpec;
+ protected ObjectMeta workerServiceMetadata;
}
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index d5c0324..390af3d 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -25,15 +25,20 @@ import java.util.Collections;
import scala.Tuple2;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
import lombok.Getter;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.k8s.Config;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
/** Spark Cluster Resource Spec: Master Service, Master StatefulSet, Worker
StatefulSet */
public class SparkClusterResourceSpec {
@@ -53,10 +58,29 @@ public class SparkClusterResourceSpec {
for (Tuple2<String, String> t : conf.getAll()) {
options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
}
- masterService = buildMasterService(clusterName, namespace);
- workerService = buildWorkerService(clusterName, namespace);
+ MasterSpec masterSpec = spec.getMasterSpec();
+ WorkerSpec workerSpec = spec.getWorkerSpec();
+ masterService =
+ buildMasterService(
+ clusterName,
+ namespace,
+ masterSpec.getMasterServiceMetadata(),
+ masterSpec.getMasterServiceSpec());
+ workerService =
+ buildWorkerService(
+ clusterName,
+ namespace,
+ workerSpec.getWorkerServiceMetadata(),
+ workerSpec.getWorkerServiceSpec());
masterStatefulSet =
- buildMasterStatefulSet(scheduler, clusterName, namespace, image,
options.toString());
+ buildMasterStatefulSet(
+ scheduler,
+ clusterName,
+ namespace,
+ image,
+ options.toString(),
+ masterSpec.getMasterStatefulSetMetadata(),
+ masterSpec.getMasterStatefulSetSpec());
workerStatefulSet =
buildWorkerStatefulSet(
scheduler,
@@ -64,17 +88,20 @@ public class SparkClusterResourceSpec {
namespace,
image,
spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
- options.toString());
+ options.toString(),
+ workerSpec.getWorkerStatefulSetMetadata(),
+ workerSpec.getWorkerStatefulSetSpec());
}
- private static Service buildMasterService(String name, String namespace) {
+ private static Service buildMasterService(
+ String name, String namespace, ObjectMeta metadata, ServiceSpec
serviceSpec) {
return new ServiceBuilder()
- .withNewMetadata()
+ .withNewMetadataLike(metadata)
.withName(name + "-master-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.withNamespace(namespace)
.endMetadata()
- .withNewSpec()
+ .withNewSpecLike(serviceSpec)
.withClusterIP("None")
.withSelector(
Collections.singletonMap(LABEL_SPARK_ROLE_NAME,
LABEL_SPARK_ROLE_MASTER_VALUE))
@@ -97,14 +124,15 @@ public class SparkClusterResourceSpec {
.build();
}
- private static Service buildWorkerService(String name, String namespace) {
+ private static Service buildWorkerService(
+ String name, String namespace, ObjectMeta metadata, ServiceSpec
serviceSpec) {
return new ServiceBuilder()
- .withNewMetadata()
+ .withNewMetadataLike(metadata)
.withName(name + "-worker-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.withNamespace(namespace)
.endMetadata()
- .withNewSpec()
+ .withNewSpecLike(serviceSpec)
.withClusterIP("None")
.withSelector(
Collections.singletonMap(LABEL_SPARK_ROLE_NAME,
LABEL_SPARK_ROLE_WORKER_VALUE))
@@ -118,24 +146,30 @@ public class SparkClusterResourceSpec {
}
private static StatefulSet buildMasterStatefulSet(
- String scheduler, String name, String namespace, String image, String
options) {
+ String scheduler,
+ String name,
+ String namespace,
+ String image,
+ String options,
+ ObjectMeta objectMeta,
+ StatefulSetSpec statefulSetSpec) {
return new StatefulSetBuilder()
- .withNewMetadata()
+ .withNewMetadataLike(objectMeta)
.withName(name + "-master")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.withNamespace(namespace)
.endMetadata()
- .withNewSpec()
+ .withNewSpecLike(statefulSetSpec)
.withPodManagementPolicy("Parallel")
.withReplicas(1)
- .withNewSelector()
+ .editOrNewSelector()
.addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.endSelector()
- .withNewTemplate()
- .withNewMetadata()
+ .editOrNewTemplate()
+ .editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.endMetadata()
- .withNewSpec()
+ .editOrNewSpec()
.withSchedulerName(scheduler)
.withTerminationGracePeriodSeconds(0L)
.addNewContainer()
@@ -176,25 +210,27 @@ public class SparkClusterResourceSpec {
String namespace,
String image,
int initWorkers,
- String options) {
+ String options,
+ ObjectMeta metadata,
+ StatefulSetSpec statefulSetSpec) {
return new StatefulSetBuilder()
- .withNewMetadata()
+ .withNewMetadataLike(metadata)
.withName(name + "-worker")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.withNamespace(namespace)
.endMetadata()
- .withNewSpec()
+ .withNewSpecLike(statefulSetSpec)
.withPodManagementPolicy("Parallel")
.withReplicas(initWorkers)
.withServiceName(name + "-worker-svc")
- .withNewSelector()
+ .editOrNewSelector()
.addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.endSelector()
- .withNewTemplate()
- .withNewMetadata()
+ .editOrNewTemplate()
+ .editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.endMetadata()
- .withNewSpec()
+ .editOrNewSpec()
.withSchedulerName(scheduler)
.withTerminationGracePeriodSeconds(0L)
.withNewDnsConfig()
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index ef1cbaf..3a6ae7e 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -24,19 +24,30 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
class SparkClusterResourceSpecTest {
SparkCluster cluster;
ObjectMeta objectMeta;
ClusterSpec clusterSpec;
+ StatefulSetSpec statefulSetSpec;
+ ServiceSpec serviceSpec;
+ MasterSpec masterSpec;
+ WorkerSpec workerSpec;
SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace",
"other-namespace");
ClusterTolerations clusterTolerations = new ClusterTolerations();
@@ -45,11 +56,25 @@ class SparkClusterResourceSpecTest {
cluster = mock(SparkCluster.class);
objectMeta = mock(ObjectMeta.class);
clusterSpec = mock(ClusterSpec.class);
+ serviceSpec = mock(ServiceSpec.class);
+ masterSpec = mock(MasterSpec.class);
+ workerSpec = mock(WorkerSpec.class);
+ statefulSetSpec = mock(StatefulSetSpec.class);
when(cluster.getMetadata()).thenReturn(objectMeta);
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
when(objectMeta.getName()).thenReturn("cluster-name");
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+ when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
+ when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+ when(masterSpec.getMasterStatefulSetSpec()).thenReturn(statefulSetSpec);
+ when(masterSpec.getMasterStatefulSetMetadata()).thenReturn(objectMeta);
+ when(masterSpec.getMasterServiceSpec()).thenReturn(serviceSpec);
+ when(masterSpec.getMasterServiceMetadata()).thenReturn(objectMeta);
+ when(workerSpec.getWorkerStatefulSetSpec()).thenReturn(statefulSetSpec);
+ when(workerSpec.getWorkerStatefulSetMetadata()).thenReturn(objectMeta);
+ when(workerSpec.getWorkerServiceSpec()).thenReturn(serviceSpec);
+ when(workerSpec.getWorkerServiceMetadata()).thenReturn(objectMeta);
}
@Test
@@ -72,6 +97,48 @@ class SparkClusterResourceSpecTest {
assertEquals("other-namespace", service2.getMetadata().getNamespace());
}
+ @Test
+ void testWorkerServiceWithTemplate() {
+ ObjectMeta objectMeta1 =
+ new ObjectMetaBuilder()
+ .withNamespace("foo")
+ .withName("bar")
+ .addToLabels("foo", "bar")
+ .build();
+ ServiceSpec serviceSpec1 = new
ServiceSpecBuilder().withExternalName("foo").build();
+ WorkerSpec workerSpec1 = mock(WorkerSpec.class);
+ when(workerSpec1.getWorkerServiceSpec()).thenReturn(serviceSpec1);
+ when(workerSpec1.getWorkerServiceMetadata()).thenReturn(objectMeta1);
+ when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec1);
+
+ Service service1 = new SparkClusterResourceSpec(cluster, new
SparkConf()).getWorkerService();
+ assertEquals("my-namespace", service1.getMetadata().getNamespace());
+ assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
+ assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+ assertEquals("foo", service1.getSpec().getExternalName());
+ }
+
+ @Test
+ void testMasterServiceWithTemplate() {
+ ObjectMeta objectMeta1 =
+ new ObjectMetaBuilder()
+ .withNamespace("foo")
+ .withName("bar")
+ .addToLabels("foo", "bar")
+ .build();
+ ServiceSpec serviceSpec1 = new
ServiceSpecBuilder().withExternalName("foo").build();
+ MasterSpec masterSpec1 = mock(MasterSpec.class);
+ when(masterSpec1.getMasterServiceSpec()).thenReturn(serviceSpec1);
+ when(masterSpec1.getMasterServiceMetadata()).thenReturn(objectMeta1);
+ when(clusterSpec.getMasterSpec()).thenReturn(masterSpec1);
+
+ Service service1 = new SparkClusterResourceSpec(cluster, new
SparkConf()).getMasterService();
+ assertEquals("my-namespace", service1.getMetadata().getNamespace());
+ assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
+ assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+ assertEquals("foo", service1.getSpec().getExternalName());
+ }
+
@Test
void testMasterStatefulSet() {
SparkClusterResourceSpec spec1 = new SparkClusterResourceSpec(cluster, new
SparkConf());
@@ -84,6 +151,40 @@ class SparkClusterResourceSpecTest {
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
}
+ @Test
+ void testMasterStatefulSetWithTemplate() {
+ ObjectMeta objectMeta1 =
+ new ObjectMetaBuilder()
+ .withNamespace("foo")
+ .withName("bar")
+ .addToLabels("foo", "bar")
+ .build();
+ StatefulSetSpec statefulSetSpec1 =
+ new StatefulSetSpecBuilder()
+ .withNewTemplate()
+ .withNewSpec()
+ .addNewInitContainer()
+ .withName("init-foo")
+ .endInitContainer()
+ .addNewContainer()
+ .withName("sidecar-foo")
+ .endContainer()
+ .endSpec()
+ .endTemplate()
+ .build();
+ MasterSpec masterSpec1 = mock(MasterSpec.class);
+ when(masterSpec1.getMasterStatefulSetMetadata()).thenReturn(objectMeta1);
+ when(masterSpec1.getMasterStatefulSetSpec()).thenReturn(statefulSetSpec1);
+ when(clusterSpec.getMasterSpec()).thenReturn(masterSpec1);
+ SparkClusterResourceSpec spec1 = new SparkClusterResourceSpec(cluster, new
SparkConf());
+ StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
+ assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
+ assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
+ assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
+ assertEquals(1,
statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
+ assertEquals(2,
statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
+ }
+
@Test
void testWorkerStatefulSet() {
SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
@@ -95,4 +196,35 @@ class SparkClusterResourceSpecTest {
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
}
+
+ @Test
+ void testWorkerStatefulSetWithTemplate() {
+ ObjectMeta objectMeta1 =
+ new ObjectMetaBuilder()
+ .withNamespace("foo")
+ .withName("bar")
+ .addToLabels("foo", "bar")
+ .build();
+ StatefulSetSpec statefulSetSpec1 =
+ new StatefulSetSpecBuilder()
+ .withNewTemplate()
+ .withNewSpec()
+ .addNewInitContainer()
+ .withName("init-foo")
+ .endInitContainer()
+ .addNewContainer()
+ .withName("sidecar-foo")
+ .endContainer()
+ .endSpec()
+ .endTemplate()
+ .build();
+ WorkerSpec workerSpec1 = mock(WorkerSpec.class);
+ when(workerSpec1.getWorkerStatefulSetMetadata()).thenReturn(objectMeta1);
+ when(workerSpec1.getWorkerStatefulSetSpec()).thenReturn(statefulSetSpec1);
+ when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec1);
+ SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
+ StatefulSet statefulSet = spec.getWorkerStatefulSet();
+ assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
+ assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+ }
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index b0d02c4..0e2ea9b 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -30,23 +30,31 @@ import org.junit.jupiter.api.Test;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
class SparkClusterSubmissionWorkerTest {
SparkCluster cluster;
ObjectMeta objectMeta;
ClusterSpec clusterSpec;
ClusterTolerations clusterTolerations = new ClusterTolerations();
+ MasterSpec masterSpec;
+ WorkerSpec workerSpec;
@BeforeEach
void setUp() {
cluster = mock(SparkCluster.class);
objectMeta = mock(ObjectMeta.class);
clusterSpec = mock(ClusterSpec.class);
+ masterSpec = mock(MasterSpec.class);
+ workerSpec = mock(WorkerSpec.class);
when(cluster.getMetadata()).thenReturn(objectMeta);
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
when(objectMeta.getName()).thenReturn("cluster-name");
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+ when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
+ when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]