This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 516cb7f6533 [SPARK-42190][K8S][FOLLOWUP] Fix to use the user-given 
number of threads
516cb7f6533 is described below

commit 516cb7f6533d028dff1e7e54806c7333e8420019
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Jan 27 02:00:27 2023 -0800

    [SPARK-42190][K8S][FOLLOWUP] Fix to use the user-given number of threads
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up PR to fix to use the user-given number of threads.
    
    ### Why are the changes needed?
    
    Previously, it always uses `1` core due to this bug.
    
    After this fix, the users can see.
    ```
    23/01/27 08:42:45 INFO KubernetesClusterManager: Running Spark with 
local[10]
    ```
    ![Screenshot 2023-01-27 at 12 43 18 
AM](https://user-images.githubusercontent.com/9700541/215044492-fb432000-2eab-4520-b99f-79d7c9eb807f.png)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is not released yet.
    
    ### How was this patch tested?
    
    Pass the CI with newly added test case.
    
    Closes #39769 from dongjoon-hyun/SPARK-42190-2.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/scheduler/cluster/k8s/KubernetesClusterManager.scala   | 9 +++++----
 .../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala       | 4 ++--
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index c01fbf4f9c7..fb0783239c6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -40,7 +40,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
     conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
 
   override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
-    val maxTaskFailures = masterURL match {
+    val maxTaskFailures = sc.conf.get(KUBERNETES_DRIVER_MASTER_URL) match {
       case "local" | LOCAL_N_REGEX(_) => 1
       case LOCAL_N_FAILURES_REGEX(_, maxFailures) => maxFailures.toInt
       case _ => sc.conf.get(TASK_MAX_FAILURES)
@@ -54,13 +54,14 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       scheduler: TaskScheduler): SchedulerBackend = {
     if (isLocal(sc.conf)) {
       def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
-      val threadCount = masterURL match {
+      val threadCount = sc.conf.get(KUBERNETES_DRIVER_MASTER_URL) match {
         case LOCAL_N_REGEX(threads) =>
-          if (threads == "*") localCpuCount else 1
+          if (threads == "*") localCpuCount else threads.toInt
         case LOCAL_N_FAILURES_REGEX(threads, _) =>
-          if (threads == "*") localCpuCount else 1
+          if (threads == "*") localCpuCount else threads.toInt
         case _ => 1
       }
+      logInfo(s"Running Spark with 
${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}")
       val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
       val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl, 
threadCount)
       schedulerImpl.initialize(backend)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 6ae000c80c5..1f48d796067 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -34,11 +34,11 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
   import KubernetesSuite.{TIMEOUT, INTERVAL}
 
   test("SPARK-42190: Run SparkPi with local[*]", k8sTestTag) {
-    sparkAppConf.set("spark.kubernetes.driver.master", "local[*]")
+    sparkAppConf.set("spark.kubernetes.driver.master", "local[10]")
     runSparkApplicationAndVerifyCompletion(
       containerLocalSparkDistroExamplesJar,
       SPARK_PI_MAIN_CLASS,
-      Seq("Pi is roughly 3"),
+      Seq("local[10]", "Pi is roughly 3"),
       Seq(),
       Array.empty[String],
       doBasicDriverPodCheck,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to