This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8d13b69e8e [SPARK-53944][K8S] Support 
`spark.kubernetes.executor.useDriverPodIP`
dd8d13b69e8e is described below

commit dd8d13b69e8ee268d0061de8949785fe9e03d17b
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 18 08:10:42 2025 -0700

    [SPARK-53944][K8S] Support `spark.kubernetes.executor.useDriverPodIP`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `Spark Executor` pod to use `Spark Driver` pod IP 
instead of `Spark Driver`'s K8s Service in order to bypass K8s DNS issues.
    
    ### Why are the changes needed?
    
    K8s DNS has a known issue and an official workaround via `initContainer` 
solution because it assumes IP can be changed during restarting the pods.
    - 
https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#init-containers-in-use
    
    This (SPARK-53944) provides an additional option for users to choose IP 
over DNS in K8s environment instead of `initContainer` workaround because 
Apache Spark Driver is not supposed to be restarted in general. When a Spark 
Driver pod terminates for some reason, it's more natural to terminate all its 
executor pods and restart the whole Spark job from the beginning.
    
    Since `Driver` Pod IP is automatically injected via 
`SPARK_DRIVER_BIND_ADDRESS`, this PR re-use it.
    
    
https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala#L131-L133
    
    
https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L82-L86
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is disabled by default.
    
    ### How was this patch tested?
    
    Pass the CI with newly added test case.
    
    I also verified manually.
    
    **spark.kubernetes.executor.useDriverPodIP=false**
    ```
    ... --driver-url 
spark://CoarseGrainedSchedulerpi-0-driver-svc.default.svc:7078 ...
    ```
    
    **spark.kubernetes.executor.useDriverPodIP=true**
    ```
    ... --driver-url spark://CoarseGrainedScheduler10.1.20.97:7078 ...
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52650 from dongjoon-hyun/SPARK-53944.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/deploy/k8s/Config.scala    |  7 +++++++
 .../deploy/k8s/features/BasicExecutorFeatureStep.scala     |  7 ++++++-
 .../k8s/features/BasicExecutorFeatureStepSuite.scala       | 14 ++++++++++++++
 3 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 8be78240e34f..3e21fc14e6e6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -120,6 +120,13 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP =
+    ConfigBuilder("spark.kubernetes.executor.useDriverPodIP")
+      .doc("If true, executor pods use Driver pod IP directly instead of 
Driver Service.")
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val KUBERNETES_NAMESPACE =
     ConfigBuilder("spark.kubernetes.namespace")
       .doc("The namespace that will be used for running the driver and 
executor pods.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 308764bbe13e..13d1f1bc98a0 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -49,8 +49,13 @@ private[spark] class BasicExecutorFeatureStep(
 
   private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
 
+  private val driverAddress = if 
(kubernetesConf.get(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP)) {
+    kubernetesConf.get(DRIVER_BIND_ADDRESS)
+  } else {
+    kubernetesConf.get(DRIVER_HOST_ADDRESS)
+  }
   private val driverUrl = RpcEndpointAddress(
-    kubernetesConf.get(DRIVER_HOST_ADDRESS),
+    driverAddress,
     kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
     CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 71d484b4cd00..ced1326e7938 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -305,6 +305,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID))
   }
 
+  test("SPARK-53944: Support spark.kubernetes.executor.useDriverPodIP") {
+    Seq((false, "localhost"), (true, "bindAddress")).foreach {
+      case (flag, address) =>
+        val conf = baseConf.clone()
+          .set(DRIVER_BIND_ADDRESS, "bindAddress")
+          .set(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP, flag)
+        val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf)
+        val step = new BasicExecutorFeatureStep(kconf, new 
SecurityManager(conf), defaultProfile)
+        val executor = step.configurePod(SparkPod.initialPod())
+        checkEnv(executor, conf, Map(
+          ENV_DRIVER_URL -> s"spark://CoarseGrainedScheduler@$address:7098"))
+    }
+  }
+
   test("test executor pyspark memory") {
     baseConf.set("spark.kubernetes.resource.type", "python")
     baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to