This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd8d13b69e8e [SPARK-53944][K8S] Support
`spark.kubernetes.executor.useDriverPodIP`
dd8d13b69e8e is described below
commit dd8d13b69e8ee268d0061de8949785fe9e03d17b
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 18 08:10:42 2025 -0700
[SPARK-53944][K8S] Support `spark.kubernetes.executor.useDriverPodIP`
### What changes were proposed in this pull request?
This PR aims to support `Spark Executor` pod to use `Spark Driver` pod IP
instead of `Spark Driver`'s K8s Service in order to bypass K8s DNS issues.
### Why are the changes needed?
K8s DNS has a known issue and an official workaround via `initContainer`
solution because it assumes IP can be changed during restarting the pods.
-
https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#init-containers-in-use
This (SPARK-53944) provides an additional option for users to choose IP
over DNS in K8s environment instead of `initContainer` workaround because
Apache Spark Driver is not supposed to be restarted in general. When a Spark
Driver pod terminates for some reason, it's more natural to terminate all its
executor pods and restart the whole Spark job from the beginning.
Since `Driver` Pod IP is automatically injected via
`SPARK_DRIVER_BIND_ADDRESS`, this PR re-use it.
https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala#L131-L133
https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L82-L86
### Does this PR introduce _any_ user-facing change?
No, this is disabled by default.
### How was this patch tested?
Pass the CI with newly added test case.
I also verified manually.
**spark.kubernetes.executor.useDriverPodIP=false**
```
... --driver-url
spark://CoarseGrainedSchedulerpi-0-driver-svc.default.svc:7078 ...
```
**spark.kubernetes.executor.useDriverPodIP=true**
```
... --driver-url spark://CoarseGrainedScheduler10.1.20.97:7078 ...
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52650 from dongjoon-hyun/SPARK-53944.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/deploy/k8s/Config.scala | 7 +++++++
.../deploy/k8s/features/BasicExecutorFeatureStep.scala | 7 ++++++-
.../k8s/features/BasicExecutorFeatureStepSuite.scala | 14 ++++++++++++++
3 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 8be78240e34f..3e21fc14e6e6 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -120,6 +120,13 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP =
+ ConfigBuilder("spark.kubernetes.executor.useDriverPodIP")
+ .doc("If true, executor pods use Driver pod IP directly instead of
Driver Service.")
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and
executor pods.")
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 308764bbe13e..13d1f1bc98a0 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -49,8 +49,13 @@ private[spark] class BasicExecutorFeatureStep(
private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
+ private val driverAddress = if
(kubernetesConf.get(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP)) {
+ kubernetesConf.get(DRIVER_BIND_ADDRESS)
+ } else {
+ kubernetesConf.get(DRIVER_HOST_ADDRESS)
+ }
private val driverUrl = RpcEndpointAddress(
- kubernetesConf.get(DRIVER_HOST_ADDRESS),
+ driverAddress,
kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 71d484b4cd00..ced1326e7938 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -305,6 +305,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID))
}
+ test("SPARK-53944: Support spark.kubernetes.executor.useDriverPodIP") {
+ Seq((false, "localhost"), (true, "bindAddress")).foreach {
+ case (flag, address) =>
+ val conf = baseConf.clone()
+ .set(DRIVER_BIND_ADDRESS, "bindAddress")
+ .set(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP, flag)
+ val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf)
+ val step = new BasicExecutorFeatureStep(kconf, new
SecurityManager(conf), defaultProfile)
+ val executor = step.configurePod(SparkPod.initialPod())
+ checkEnv(executor, conf, Map(
+ ENV_DRIVER_URL -> s"spark://CoarseGrainedScheduler@$address:7098"))
+ }
+ }
+
test("test executor pyspark memory") {
baseConf.set("spark.kubernetes.resource.type", "python")
baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]