This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 3ef08fd [SPARK-53911] Support `SPARK_VERSION` placeholder in
container image names
3ef08fd is described below
commit 3ef08fd56ac7de945d5139d7f6da3f86c05c9dd1
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Oct 14 19:43:40 2025 -0700
[SPARK-53911] Support `SPARK_VERSION` placeholder in container image names
### What changes were proposed in this pull request?
This PR allows users to use `{{SPARK_VERSION}}` in the following three
configs.
```
spark.kubernetes.container.image
spark.kubernetes.driver.container.image
spark.kubernetes.executor.container.image
```
### Why are the changes needed?
We should implement this in `Apache Spark K8s Operator` for feature parity
with Apache Spark 4.1.0. Otherwise, it will be replaced with the compile
dependency, `4.1.0-preview2` instead of the value of
`RuntimeVersions.sparkVersion`.
- https://github.com/apache/spark/pull/51592
### Does this PR introduce _any_ user-facing change?
No because `{{SPARK_VERSION}}` is a newly supported pattern.
### How was this patch tested?
Pass the CIs with newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #391 from dongjoon-hyun/SPARK-53911.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/operator/SparkAppSubmissionWorker.java | 10 +++++---
.../k8s/operator/SparkClusterSubmissionWorker.java | 8 +++++++
.../k8s/operator/SparkAppSubmissionWorkerTest.java | 28 +++++++++++++++++++++-
.../operator/SparkClusterSubmissionWorkerTest.java | 24 ++++++++++++++++++-
4 files changed, 65 insertions(+), 5 deletions(-)
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 096f8fe..ddda781 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -124,10 +124,16 @@ public class SparkAppSubmissionWorker {
protected SparkAppDriverConf buildDriverConf(
SparkApplication app, Map<String, String> confOverrides) {
ApplicationSpec applicationSpec = app.getSpec();
+ RuntimeVersions versions = applicationSpec.getRuntimeVersions();
+ String sparkVersion = (versions != null) ? versions.getSparkVersion() :
"UNKNOWN";
SparkConf effectiveSparkConf = new SparkConf();
if (!applicationSpec.getSparkConf().isEmpty()) {
for (String confKey : applicationSpec.getSparkConf().keySet()) {
- effectiveSparkConf.set(confKey,
applicationSpec.getSparkConf().get(confKey));
+ String value = applicationSpec.getSparkConf().get(confKey);
+ if (confKey.startsWith("spark.kubernetes.") &&
confKey.endsWith("container.image")) {
+ value = value.replace("{{SPARK_VERSION}}", sparkVersion);
+ }
+ effectiveSparkConf.set(confKey, value);
}
}
if (!confOverrides.isEmpty()) {
@@ -159,8 +165,6 @@ public class SparkAppSubmissionWorker {
sparkMasterUrlPrefix +
"https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
String appId = generateSparkAppId(app);
effectiveSparkConf.setIfMissing("spark.app.id", appId);
- RuntimeVersions versions = applicationSpec.getRuntimeVersions();
- String sparkVersion = (versions != null) ? versions.getSparkVersion() :
"UNKNOWN";
return SparkAppDriverConf.create(
effectiveSparkConf,
sparkVersion,
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
index 94760ff..38b49e3 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
@@ -22,6 +22,7 @@ package org.apache.spark.k8s.operator;
import java.util.Map;
import org.apache.spark.SparkConf;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
/** Worker for submitting Spark clusters. */
public class SparkClusterSubmissionWorker {
@@ -34,12 +35,19 @@ public class SparkClusterSubmissionWorker {
*/
public SparkClusterResourceSpec getResourceSpec(
SparkCluster cluster, Map<String, String> confOverrides) {
+ RuntimeVersions versions = cluster.getSpec().getRuntimeVersions();
+ String sparkVersion = (versions != null) ? versions.getSparkVersion() :
"UNKNOWN";
SparkConf effectiveSparkConf = new SparkConf();
Map<String, String> confFromSpec = cluster.getSpec().getSparkConf();
if (!confFromSpec.isEmpty()) {
for (Map.Entry<String, String> entry : confFromSpec.entrySet()) {
effectiveSparkConf.set(entry.getKey(), entry.getValue());
+ String value = entry.getValue();
+ if ("spark.kubernetes.container.image".equals(entry.getKey())) {
+ value = value.replace("{{SPARK_VERSION}}", sparkVersion);
+ }
+ effectiveSparkConf.set(entry.getKey(), value);
}
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 0dcc43b..6d080cf 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -44,6 +44,7 @@ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
import org.apache.spark.deploy.k8s.submit.RMainAppResource;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.status.AttemptInfo;
@@ -260,6 +261,31 @@ class SparkAppSubmissionWorkerTest {
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
- assertEquals(conf.appId(), "foo");
+ assertEquals("foo", conf.appId());
+ }
+
+ @Test
+ void supportSparkVersionPlaceHolder() {
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+ RuntimeVersions mockRuntimeVersions = mock(RuntimeVersions.class);
+ Map<String, String> appProps = new HashMap<>();
+ appProps.put("spark.kubernetes.container.image",
"apache/spark:{{SPARK_VERSION}}");
+ appProps.put("spark.kubernetes.driver.container.image",
"apache/spark:{{SPARK_VERSION}}");
+ appProps.put("spark.kubernetes.executor.container.image",
"apache/spark:{{SPARK_VERSION}}");
+ appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
+ ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+ when(mockSpec.getSparkConf()).thenReturn(appProps);
+ when(mockApp.getSpec()).thenReturn(mockSpec);
+ when(mockApp.getMetadata()).thenReturn(appMeta);
+ when(mockSpec.getRuntimeVersions()).thenReturn(mockRuntimeVersions);
+ when(mockRuntimeVersions.getSparkVersion()).thenReturn("dev");
+
+ SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
+ SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
+ assertEquals("apache/spark:dev",
conf.get("spark.kubernetes.container.image"));
+ assertEquals("apache/spark:dev",
conf.get("spark.kubernetes.driver.container.image"));
+ assertEquals("apache/spark:dev",
conf.get("spark.kubernetes.executor.container.image"));
+ assertEquals("apache/spark:{{SPARK_VERSION}}",
conf.get("spark.kubernetes.key"));
}
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 6418292..3391755 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -23,6 +23,8 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +43,7 @@ class SparkClusterSubmissionWorkerTest {
ClusterTolerations clusterTolerations = new ClusterTolerations();
MasterSpec masterSpec;
WorkerSpec workerSpec;
- RuntimeVersions runtimeVersions = new RuntimeVersions();
+ RuntimeVersions runtimeVersions;
@BeforeEach
void setUp() {
@@ -50,6 +52,7 @@ class SparkClusterSubmissionWorkerTest {
clusterSpec = mock(ClusterSpec.class);
masterSpec = mock(MasterSpec.class);
workerSpec = mock(WorkerSpec.class);
+ runtimeVersions = mock(RuntimeVersions.class);
when(cluster.getMetadata()).thenReturn(objectMeta);
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
@@ -58,6 +61,10 @@ class SparkClusterSubmissionWorkerTest {
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
+ when(runtimeVersions.getSparkVersion()).thenReturn("dev");
+ Map<String, String> sparkConf = new HashMap<>();
+ sparkConf.put("spark.kubernetes.container.image",
"apache/spark:{{SPARK_VERSION}}");
+ when(clusterSpec.getSparkConf()).thenReturn(sparkConf);
}
@Test
@@ -70,4 +77,19 @@ class SparkClusterSubmissionWorkerTest {
assertNotNull(spec.getWorkerStatefulSet());
assertNotNull(spec.getHorizontalPodAutoscaler());
}
+
+ @Test
+ void supportSparkVersionPlaceHolder() {
+ SparkClusterSubmissionWorker worker = new SparkClusterSubmissionWorker();
+ SparkClusterResourceSpec spec = worker.getResourceSpec(cluster,
Collections.emptyMap());
+ assertEquals(
+ "apache/spark:dev",
+ spec.getMasterStatefulSet()
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0)
+ .getImage());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]