This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 48ad3c6  [SPARK-53874] `SparkAppDriverConf` should respect 
`sparkVersion` of `SparkApplication` CRD
48ad3c6 is described below

commit 48ad3c6f6eed59f444839f84e58149c469b3abe2
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 11 00:37:53 2025 -0700

    [SPARK-53874] `SparkAppDriverConf` should respect `sparkVersion` of 
`SparkApplication` CRD
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `SparkAppDriverConf` to respect `sparkVersion` of 
`SparkApplication` CRD.
    
    ### Why are the changes needed?
    
    This is a long standing bug from the initial implementation.
    - https://github.com/apache/spark-kubernetes-operator/pull/10
    
    Since Apache Spark K8s Operator can launch various Spark versions, 
`spark-version` label should come from `SparkApplication` CRD's `sparkVersion` 
field.
    
    However, currently, the Spark version of compile dependency is used for 
`Driver` resources (like `Driver Pod` and `Driver Service`. We should override 
this.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this is a bug fix to use a correct version information.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #385 from dongjoon-hyun/SPARK-53874.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/k8s/operator/SparkAppDriverConf.java     | 20 +++++++++++-
 .../k8s/operator/SparkAppSubmissionWorker.java     |  4 +++
 .../spark/k8s/operator/SparkAppDriverConfTest.java | 36 ++++++++++++++++++++--
 .../k8s/operator/SparkAppSubmissionWorkerTest.java | 30 +++++++++---------
 4 files changed, 72 insertions(+), 18 deletions(-)

diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
index 15f3a02..a4cb03a 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
@@ -19,6 +19,8 @@
 
 package org.apache.spark.k8s.operator;
 
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
+
 import scala.Option;
 
 import org.apache.spark.SparkConf;
@@ -30,14 +32,18 @@ import org.apache.spark.deploy.k8s.submit.MainAppResource;
 
 /** Spark application driver configuration. */
 public final class SparkAppDriverConf extends KubernetesDriverConf {
+  private final String sparkVersion;
+
   private SparkAppDriverConf(
       SparkConf sparkConf,
+      String sparkVersion,
       String appId,
       MainAppResource mainAppResource,
       String mainClass,
       String[] appArgs,
       Option<String> proxyUser) {
     super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, 
null);
+    this.sparkVersion = sparkVersion;
   }
 
   /**
@@ -53,6 +59,7 @@ public final class SparkAppDriverConf extends 
KubernetesDriverConf {
    */
   public static SparkAppDriverConf create(
       SparkConf sparkConf,
+      String sparkVersion,
       String appId,
       MainAppResource mainAppResource,
       String mainClass,
@@ -61,7 +68,8 @@ public final class SparkAppDriverConf extends 
KubernetesDriverConf {
     // pre-create check only
     KubernetesVolumeUtils.parseVolumesWithPrefix(
         sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
-    return new SparkAppDriverConf(sparkConf, appId, mainAppResource, 
mainClass, appArgs, proxyUser);
+    return new SparkAppDriverConf(
+        sparkConf, sparkVersion, appId, mainAppResource, mainClass, appArgs, 
proxyUser);
   }
 
   /**
@@ -74,6 +82,16 @@ public final class SparkAppDriverConf extends 
KubernetesDriverConf {
     return appId();
   }
 
+  /**
+   * Returns the driver label key and value map.
+   *
+   * @return The label key-value pair map.
+   */
+  @Override
+  public scala.collection.immutable.Map<String, String> labels() {
+    return super.labels().updated(LABEL_SPARK_VERSION_NAME, sparkVersion);
+  }
+
   /**
    * Creates the name to be used by the driver config map. The name consists 
of `resourceNamePrefix`
    * and Spark instance type (driver). Operator proposes `resourceNamePrefix` 
with leaves naming
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 04beda0..096f8fe 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -41,6 +41,7 @@ import org.apache.spark.deploy.k8s.submit.RMainAppResource;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
 import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
 import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.utils.ModelUtils;
 import org.apache.spark.k8s.operator.utils.StringUtils;
 
@@ -158,8 +159,11 @@ public class SparkAppSubmissionWorker {
         sparkMasterUrlPrefix + 
"https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT";);
     String appId = generateSparkAppId(app);
     effectiveSparkConf.setIfMissing("spark.app.id", appId);
+    RuntimeVersions versions = applicationSpec.getRuntimeVersions();
+    String sparkVersion = (versions != null) ? versions.getSparkVersion() : 
"UNKNOWN";
     return SparkAppDriverConf.create(
         effectiveSparkConf,
+        sparkVersion,
         effectiveSparkConf.getAppId(),
         primaryResource,
         applicationSpec.getMainClass(),
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
index 6940515..fd94d3a 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
@@ -33,6 +33,8 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
 
 class SparkAppDriverConfTest {
+  static final String VERSION = "dev";
+
   @Test
   void testResourceNamePrefix() {
     // Resource prefix shall be deterministic per SparkApp per attempt
@@ -42,7 +44,13 @@ class SparkAppDriverConfTest {
     String appId = UUID.randomUUID().toString();
     SparkAppDriverConf sparkAppDriverConf =
         SparkAppDriverConf.create(
-            sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, 
Option.empty());
+            sparkConf,
+            VERSION,
+            appId,
+            mock(JavaMainAppResource.class),
+            "foo",
+            null,
+            Option.empty());
     String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
     assertEquals(
         resourcePrefix,
@@ -65,10 +73,34 @@ class SparkAppDriverConfTest {
     String appId = "a".repeat(1000);
     SparkAppDriverConf sparkAppDriverConf =
         SparkAppDriverConf.create(
-            sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, 
Option.empty());
+            sparkConf,
+            VERSION,
+            appId,
+            mock(JavaMainAppResource.class),
+            "foo",
+            null,
+            Option.empty());
     String configMapNameDriver = sparkAppDriverConf.configMapNameDriver();
     assertTrue(
         configMapNameDriver.length() <= 253,
         "config map name length should always comply k8s DNS subdomain 
length");
   }
+
+  @Test
+  void testLabels() {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.set("foo", "bar");
+    sparkConf.set("spark.executor.instances", "1");
+    String appId = "a".repeat(1000);
+    SparkAppDriverConf sparkAppDriverConf =
+        SparkAppDriverConf.create(
+            sparkConf,
+            VERSION,
+            appId,
+            mock(JavaMainAppResource.class),
+            "foo",
+            null,
+            Option.empty());
+    assertEquals(VERSION, 
sparkAppDriverConf.labels().get("spark-version").get());
+  }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index a1abde1..0dcc43b 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -76,7 +76,7 @@ class SparkAppSubmissionWorkerTest {
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
       SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
overrides);
-      assertEquals(6, constructorArgs.get(conf).size());
+      assertEquals(7, constructorArgs.get(conf).size());
 
       // validate SparkConf with override
       assertInstanceOf(SparkConf.class, constructorArgs.get(conf).get(0));
@@ -90,14 +90,14 @@ class SparkAppSubmissionWorkerTest {
           "namespace from CR takes highest precedence");
 
       // validate main resources
-      assertInstanceOf(JavaMainAppResource.class, 
constructorArgs.get(conf).get(2));
-      JavaMainAppResource mainResource = (JavaMainAppResource) 
constructorArgs.get(conf).get(2);
+      assertInstanceOf(JavaMainAppResource.class, 
constructorArgs.get(conf).get(3));
+      JavaMainAppResource mainResource = (JavaMainAppResource) 
constructorArgs.get(conf).get(3);
       assertTrue(mainResource.primaryResource().isEmpty());
 
-      assertEquals("foo-class", constructorArgs.get(conf).get(3));
+      assertEquals("foo-class", constructorArgs.get(conf).get(4));
 
-      assertInstanceOf(String[].class, constructorArgs.get(conf).get(4));
-      String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4);
+      assertInstanceOf(String[].class, constructorArgs.get(conf).get(5));
+      String[] capturedArgs = (String[]) constructorArgs.get(conf).get(5);
       assertEquals(2, capturedArgs.length);
       assertEquals("a", capturedArgs[0]);
       assertEquals("b", capturedArgs[1]);
@@ -120,11 +120,11 @@ class SparkAppSubmissionWorkerTest {
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
       SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
-      assertEquals(6, constructorArgs.get(conf).size());
+      assertEquals(7, constructorArgs.get(conf).size());
 
       // validate main resources
-      assertInstanceOf(PythonMainAppResource.class, 
constructorArgs.get(conf).get(2));
-      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(2);
+      assertInstanceOf(PythonMainAppResource.class, 
constructorArgs.get(conf).get(3));
+      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(3);
       assertEquals("foo", mainResource.primaryResource());
     }
   }
@@ -146,13 +146,13 @@ class SparkAppSubmissionWorkerTest {
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
       SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
-      assertEquals(6, constructorArgs.get(conf).size());
+      assertEquals(7, constructorArgs.get(conf).size());
       assertEquals(
           "lib.py", ((SparkConf) 
constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
 
       // validate main resources
-      assertInstanceOf(PythonMainAppResource.class, 
constructorArgs.get(conf).get(2));
-      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(2);
+      assertInstanceOf(PythonMainAppResource.class, 
constructorArgs.get(conf).get(3));
+      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(3);
       assertEquals("main.py", mainResource.primaryResource());
     }
   }
@@ -173,11 +173,11 @@ class SparkAppSubmissionWorkerTest {
 
       SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
       SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
-      assertEquals(6, constructorArgs.get(conf).size());
+      assertEquals(7, constructorArgs.get(conf).size());
 
       // validate main resources
-      assertInstanceOf(RMainAppResource.class, 
constructorArgs.get(conf).get(2));
-      RMainAppResource mainResource = (RMainAppResource) 
constructorArgs.get(conf).get(2);
+      assertInstanceOf(RMainAppResource.class, 
constructorArgs.get(conf).get(3));
+      RMainAppResource mainResource = (RMainAppResource) 
constructorArgs.get(conf).get(3);
       assertEquals("foo", mainResource.primaryResource());
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to