This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 48ad3c6 [SPARK-53874] `SparkAppDriverConf` should respect
`sparkVersion` of `SparkApplication` CRD
48ad3c6 is described below
commit 48ad3c6f6eed59f444839f84e58149c469b3abe2
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 11 00:37:53 2025 -0700
[SPARK-53874] `SparkAppDriverConf` should respect `sparkVersion` of
`SparkApplication` CRD
### What changes were proposed in this pull request?
This PR aims to fix `SparkAppDriverConf` to respect `sparkVersion` of
`SparkApplication` CRD.
### Why are the changes needed?
This is a long standing bug from the initial implementation.
- https://github.com/apache/spark-kubernetes-operator/pull/10
Since Apache Spark K8s Operator can launch various Spark versions,
`spark-version` label should come from `SparkApplication` CRD's `sparkVersion`
field.
However, currently, the Spark version of compile dependency is used for
`Driver` resources (like `Driver Pod` and `Driver Service`. We should override
this.
### Does this PR introduce _any_ user-facing change?
Yes, this is a bug fix to use a correct version information.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #385 from dongjoon-hyun/SPARK-53874.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/k8s/operator/SparkAppDriverConf.java | 20 +++++++++++-
.../k8s/operator/SparkAppSubmissionWorker.java | 4 +++
.../spark/k8s/operator/SparkAppDriverConfTest.java | 36 ++++++++++++++++++++--
.../k8s/operator/SparkAppSubmissionWorkerTest.java | 30 +++++++++---------
4 files changed, 72 insertions(+), 18 deletions(-)
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
index 15f3a02..a4cb03a 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
@@ -19,6 +19,8 @@
package org.apache.spark.k8s.operator;
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
+
import scala.Option;
import org.apache.spark.SparkConf;
@@ -30,14 +32,18 @@ import org.apache.spark.deploy.k8s.submit.MainAppResource;
/** Spark application driver configuration. */
public final class SparkAppDriverConf extends KubernetesDriverConf {
+ private final String sparkVersion;
+
private SparkAppDriverConf(
SparkConf sparkConf,
+ String sparkVersion,
String appId,
MainAppResource mainAppResource,
String mainClass,
String[] appArgs,
Option<String> proxyUser) {
super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser,
null);
+ this.sparkVersion = sparkVersion;
}
/**
@@ -53,6 +59,7 @@ public final class SparkAppDriverConf extends
KubernetesDriverConf {
*/
public static SparkAppDriverConf create(
SparkConf sparkConf,
+ String sparkVersion,
String appId,
MainAppResource mainAppResource,
String mainClass,
@@ -61,7 +68,8 @@ public final class SparkAppDriverConf extends
KubernetesDriverConf {
// pre-create check only
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
- return new SparkAppDriverConf(sparkConf, appId, mainAppResource,
mainClass, appArgs, proxyUser);
+ return new SparkAppDriverConf(
+ sparkConf, sparkVersion, appId, mainAppResource, mainClass, appArgs,
proxyUser);
}
/**
@@ -74,6 +82,16 @@ public final class SparkAppDriverConf extends
KubernetesDriverConf {
return appId();
}
+ /**
+ * Returns the driver label key and value map.
+ *
+ * @return The label key-value pair map.
+ */
+ @Override
+ public scala.collection.immutable.Map<String, String> labels() {
+ return super.labels().updated(LABEL_SPARK_VERSION_NAME, sparkVersion);
+ }
+
/**
* Creates the name to be used by the driver config map. The name consists
of `resourceNamePrefix`
* and Spark instance type (driver). Operator proposes `resourceNamePrefix`
with leaves naming
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 04beda0..096f8fe 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -41,6 +41,7 @@ import org.apache.spark.deploy.k8s.submit.RMainAppResource;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.utils.ModelUtils;
import org.apache.spark.k8s.operator.utils.StringUtils;
@@ -158,8 +159,11 @@ public class SparkAppSubmissionWorker {
sparkMasterUrlPrefix +
"https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
String appId = generateSparkAppId(app);
effectiveSparkConf.setIfMissing("spark.app.id", appId);
+ RuntimeVersions versions = applicationSpec.getRuntimeVersions();
+ String sparkVersion = (versions != null) ? versions.getSparkVersion() :
"UNKNOWN";
return SparkAppDriverConf.create(
effectiveSparkConf,
+ sparkVersion,
effectiveSparkConf.getAppId(),
primaryResource,
applicationSpec.getMainClass(),
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
index 6940515..fd94d3a 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
@@ -33,6 +33,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
class SparkAppDriverConfTest {
+ static final String VERSION = "dev";
+
@Test
void testResourceNamePrefix() {
// Resource prefix shall be deterministic per SparkApp per attempt
@@ -42,7 +44,13 @@ class SparkAppDriverConfTest {
String appId = UUID.randomUUID().toString();
SparkAppDriverConf sparkAppDriverConf =
SparkAppDriverConf.create(
- sparkConf, appId, mock(JavaMainAppResource.class), "foo", null,
Option.empty());
+ sparkConf,
+ VERSION,
+ appId,
+ mock(JavaMainAppResource.class),
+ "foo",
+ null,
+ Option.empty());
String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
assertEquals(
resourcePrefix,
@@ -65,10 +73,34 @@ class SparkAppDriverConfTest {
String appId = "a".repeat(1000);
SparkAppDriverConf sparkAppDriverConf =
SparkAppDriverConf.create(
- sparkConf, appId, mock(JavaMainAppResource.class), "foo", null,
Option.empty());
+ sparkConf,
+ VERSION,
+ appId,
+ mock(JavaMainAppResource.class),
+ "foo",
+ null,
+ Option.empty());
String configMapNameDriver = sparkAppDriverConf.configMapNameDriver();
assertTrue(
configMapNameDriver.length() <= 253,
"config map name length should always comply k8s DNS subdomain
length");
}
+
+ @Test
+ void testLabels() {
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.set("foo", "bar");
+ sparkConf.set("spark.executor.instances", "1");
+ String appId = "a".repeat(1000);
+ SparkAppDriverConf sparkAppDriverConf =
+ SparkAppDriverConf.create(
+ sparkConf,
+ VERSION,
+ appId,
+ mock(JavaMainAppResource.class),
+ "foo",
+ null,
+ Option.empty());
+ assertEquals(VERSION,
sparkAppDriverConf.labels().get("spark-version").get());
+ }
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index a1abde1..0dcc43b 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -76,7 +76,7 @@ class SparkAppSubmissionWorkerTest {
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
overrides);
- assertEquals(6, constructorArgs.get(conf).size());
+ assertEquals(7, constructorArgs.get(conf).size());
// validate SparkConf with override
assertInstanceOf(SparkConf.class, constructorArgs.get(conf).get(0));
@@ -90,14 +90,14 @@ class SparkAppSubmissionWorkerTest {
"namespace from CR takes highest precedence");
// validate main resources
- assertInstanceOf(JavaMainAppResource.class,
constructorArgs.get(conf).get(2));
- JavaMainAppResource mainResource = (JavaMainAppResource)
constructorArgs.get(conf).get(2);
+ assertInstanceOf(JavaMainAppResource.class,
constructorArgs.get(conf).get(3));
+ JavaMainAppResource mainResource = (JavaMainAppResource)
constructorArgs.get(conf).get(3);
assertTrue(mainResource.primaryResource().isEmpty());
- assertEquals("foo-class", constructorArgs.get(conf).get(3));
+ assertEquals("foo-class", constructorArgs.get(conf).get(4));
- assertInstanceOf(String[].class, constructorArgs.get(conf).get(4));
- String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4);
+ assertInstanceOf(String[].class, constructorArgs.get(conf).get(5));
+ String[] capturedArgs = (String[]) constructorArgs.get(conf).get(5);
assertEquals(2, capturedArgs.length);
assertEquals("a", capturedArgs[0]);
assertEquals("b", capturedArgs[1]);
@@ -120,11 +120,11 @@ class SparkAppSubmissionWorkerTest {
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
- assertEquals(6, constructorArgs.get(conf).size());
+ assertEquals(7, constructorArgs.get(conf).size());
// validate main resources
- assertInstanceOf(PythonMainAppResource.class,
constructorArgs.get(conf).get(2));
- PythonMainAppResource mainResource = (PythonMainAppResource)
constructorArgs.get(conf).get(2);
+ assertInstanceOf(PythonMainAppResource.class,
constructorArgs.get(conf).get(3));
+ PythonMainAppResource mainResource = (PythonMainAppResource)
constructorArgs.get(conf).get(3);
assertEquals("foo", mainResource.primaryResource());
}
}
@@ -146,13 +146,13 @@ class SparkAppSubmissionWorkerTest {
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
- assertEquals(6, constructorArgs.get(conf).size());
+ assertEquals(7, constructorArgs.get(conf).size());
assertEquals(
"lib.py", ((SparkConf)
constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
// validate main resources
- assertInstanceOf(PythonMainAppResource.class,
constructorArgs.get(conf).get(2));
- PythonMainAppResource mainResource = (PythonMainAppResource)
constructorArgs.get(conf).get(2);
+ assertInstanceOf(PythonMainAppResource.class,
constructorArgs.get(conf).get(3));
+ PythonMainAppResource mainResource = (PythonMainAppResource)
constructorArgs.get(conf).get(3);
assertEquals("main.py", mainResource.primaryResource());
}
}
@@ -173,11 +173,11 @@ class SparkAppSubmissionWorkerTest {
SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
- assertEquals(6, constructorArgs.get(conf).size());
+ assertEquals(7, constructorArgs.get(conf).size());
// validate main resources
- assertInstanceOf(RMainAppResource.class,
constructorArgs.get(conf).get(2));
- RMainAppResource mainResource = (RMainAppResource)
constructorArgs.get(conf).get(2);
+ assertInstanceOf(RMainAppResource.class,
constructorArgs.get(conf).get(3));
+ RMainAppResource mainResource = (RMainAppResource)
constructorArgs.get(conf).get(3);
assertEquals("foo", mainResource.primaryResource());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]