This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ef08fd  [SPARK-53911] Support `SPARK_VERSION` placeholder in 
container image names
3ef08fd is described below

commit 3ef08fd56ac7de945d5139d7f6da3f86c05c9dd1
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Oct 14 19:43:40 2025 -0700

    [SPARK-53911] Support `SPARK_VERSION` placeholder in container image names
    
    ### What changes were proposed in this pull request?
    
    This PR allows users to use `{{SPARK_VERSION}}` in the following three 
configs.
    
    ```
    spark.kubernetes.container.image
    spark.kubernetes.driver.container.image
    spark.kubernetes.executor.container.image
    ```
    
    ### Why are the changes needed?
    
    We should implement this in `Apache Spark K8s Operator` for feature parity 
with Apache Spark 4.1.0. Otherwise, it will be replaced with the compile 
dependency, `4.1.0-preview2` instead of the value of 
`RuntimeVersions.sparkVersion`.
    - https://github.com/apache/spark/pull/51592
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because `{{SPARK_VERSION}}` is a newly supported pattern.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #391 from dongjoon-hyun/SPARK-53911.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/operator/SparkAppSubmissionWorker.java     | 10 +++++---
 .../k8s/operator/SparkClusterSubmissionWorker.java |  8 +++++++
 .../k8s/operator/SparkAppSubmissionWorkerTest.java | 28 +++++++++++++++++++++-
 .../operator/SparkClusterSubmissionWorkerTest.java | 24 ++++++++++++++++++-
 4 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 096f8fe..ddda781 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -124,10 +124,16 @@ public class SparkAppSubmissionWorker {
   protected SparkAppDriverConf buildDriverConf(
       SparkApplication app, Map<String, String> confOverrides) {
     ApplicationSpec applicationSpec = app.getSpec();
+    RuntimeVersions versions = applicationSpec.getRuntimeVersions();
+    String sparkVersion = (versions != null) ? versions.getSparkVersion() : 
"UNKNOWN";
     SparkConf effectiveSparkConf = new SparkConf();
     if (!applicationSpec.getSparkConf().isEmpty()) {
       for (String confKey : applicationSpec.getSparkConf().keySet()) {
-        effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+        String value = applicationSpec.getSparkConf().get(confKey);
+        if (confKey.startsWith("spark.kubernetes.") && 
confKey.endsWith("container.image")) {
+          value = value.replace("{{SPARK_VERSION}}", sparkVersion);
+        }
+        effectiveSparkConf.set(confKey, value);
       }
     }
     if (!confOverrides.isEmpty()) {
@@ -159,8 +165,6 @@ public class SparkAppSubmissionWorker {
         sparkMasterUrlPrefix + 
"https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT";);
     String appId = generateSparkAppId(app);
     effectiveSparkConf.setIfMissing("spark.app.id", appId);
-    RuntimeVersions versions = applicationSpec.getRuntimeVersions();
-    String sparkVersion = (versions != null) ? versions.getSparkVersion() : 
"UNKNOWN";
     return SparkAppDriverConf.create(
         effectiveSparkConf,
         sparkVersion,
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
index 94760ff..38b49e3 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java
@@ -22,6 +22,7 @@ package org.apache.spark.k8s.operator;
 import java.util.Map;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 
 /** Worker for submitting Spark clusters. */
 public class SparkClusterSubmissionWorker {
@@ -34,12 +35,19 @@ public class SparkClusterSubmissionWorker {
    */
   public SparkClusterResourceSpec getResourceSpec(
       SparkCluster cluster, Map<String, String> confOverrides) {
+    RuntimeVersions versions = cluster.getSpec().getRuntimeVersions();
+    String sparkVersion = (versions != null) ? versions.getSparkVersion() : 
"UNKNOWN";
     SparkConf effectiveSparkConf = new SparkConf();
 
     Map<String, String> confFromSpec = cluster.getSpec().getSparkConf();
     if (!confFromSpec.isEmpty()) {
       for (Map.Entry<String, String> entry : confFromSpec.entrySet()) {
         effectiveSparkConf.set(entry.getKey(), entry.getValue());
+        String value = entry.getValue();
+        if ("spark.kubernetes.container.image".equals(entry.getKey())) {
+          value = value.replace("{{SPARK_VERSION}}", sparkVersion);
+        }
+        effectiveSparkConf.set(entry.getKey(), value);
       }
     }
 
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 0dcc43b..6d080cf 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -44,6 +44,7 @@ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
 import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
 import org.apache.spark.deploy.k8s.submit.RMainAppResource;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
 import org.apache.spark.k8s.operator.status.AttemptInfo;
@@ -260,6 +261,31 @@ class SparkAppSubmissionWorkerTest {
 
     SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
     SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
-    assertEquals(conf.appId(), "foo");
+    assertEquals("foo", conf.appId());
+  }
+
+  @Test
+  void supportSparkVersionPlaceHolder() {
+    SparkApplication mockApp = mock(SparkApplication.class);
+    ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+    RuntimeVersions mockRuntimeVersions = mock(RuntimeVersions.class);
+    Map<String, String> appProps = new HashMap<>();
+    appProps.put("spark.kubernetes.container.image", 
"apache/spark:{{SPARK_VERSION}}");
+    appProps.put("spark.kubernetes.driver.container.image", 
"apache/spark:{{SPARK_VERSION}}");
+    appProps.put("spark.kubernetes.executor.container.image", 
"apache/spark:{{SPARK_VERSION}}");
+    appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
+    ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+    when(mockSpec.getSparkConf()).thenReturn(appProps);
+    when(mockApp.getSpec()).thenReturn(mockSpec);
+    when(mockApp.getMetadata()).thenReturn(appMeta);
+    when(mockSpec.getRuntimeVersions()).thenReturn(mockRuntimeVersions);
+    when(mockRuntimeVersions.getSparkVersion()).thenReturn("dev");
+
+    SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
+    SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
+    assertEquals("apache/spark:dev", 
conf.get("spark.kubernetes.container.image"));
+    assertEquals("apache/spark:dev", 
conf.get("spark.kubernetes.driver.container.image"));
+    assertEquals("apache/spark:dev", 
conf.get("spark.kubernetes.executor.container.image"));
+    assertEquals("apache/spark:{{SPARK_VERSION}}", 
conf.get("spark.kubernetes.key"));
   }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 6418292..3391755 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -23,6 +23,8 @@ import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.Mockito.*;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +43,7 @@ class SparkClusterSubmissionWorkerTest {
   ClusterTolerations clusterTolerations = new ClusterTolerations();
   MasterSpec masterSpec;
   WorkerSpec workerSpec;
-  RuntimeVersions runtimeVersions = new RuntimeVersions();
+  RuntimeVersions runtimeVersions;
 
   @BeforeEach
   void setUp() {
@@ -50,6 +52,7 @@ class SparkClusterSubmissionWorkerTest {
     clusterSpec = mock(ClusterSpec.class);
     masterSpec = mock(MasterSpec.class);
     workerSpec = mock(WorkerSpec.class);
+    runtimeVersions = mock(RuntimeVersions.class);
     when(cluster.getMetadata()).thenReturn(objectMeta);
     when(cluster.getSpec()).thenReturn(clusterSpec);
     when(objectMeta.getNamespace()).thenReturn("my-namespace");
@@ -58,6 +61,10 @@ class SparkClusterSubmissionWorkerTest {
     when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
     when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
     when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
+    when(runtimeVersions.getSparkVersion()).thenReturn("dev");
+    Map<String, String> sparkConf = new HashMap<>();
+    sparkConf.put("spark.kubernetes.container.image", 
"apache/spark:{{SPARK_VERSION}}");
+    when(clusterSpec.getSparkConf()).thenReturn(sparkConf);
   }
 
   @Test
@@ -70,4 +77,19 @@ class SparkClusterSubmissionWorkerTest {
     assertNotNull(spec.getWorkerStatefulSet());
     assertNotNull(spec.getHorizontalPodAutoscaler());
   }
+
+  @Test
+  void supportSparkVersionPlaceHolder() {
+    SparkClusterSubmissionWorker worker = new SparkClusterSubmissionWorker();
+    SparkClusterResourceSpec spec = worker.getResourceSpec(cluster, 
Collections.emptyMap());
+    assertEquals(
+        "apache/spark:dev",
+        spec.getMasterStatefulSet()
+            .getSpec()
+            .getTemplate()
+            .getSpec()
+            .getContainers()
+            .get(0)
+            .getImage());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to