This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf6e90c205b [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 
when they are set in spark-defaults.conf
cf6e90c205b is described below

commit cf6e90c205bd76ff9e1fc2d88757d9d44ec93162
Author: Thomas Graves <[email protected]>
AuthorDate: Thu Jun 22 17:40:47 2023 -0700

    [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set 
in spark-defaults.conf
    
    ### What changes were proposed in this pull request?
    
    https://issues.apache.org/jira/browse/SPARK-44134
    
    With resource aware scheduling, if you specify a default value in the 
spark-defaults.conf, a user can't override that to set it to 0.
    
    Meaning spark-defaults.conf has something like:
    spark.executor.resource.{resourceName}.amount=1
    spark.task.resource.{resourceName}.amount =1
    
    If the user tries to override when submitting an application with 
spark.executor.resource.{resourceName}.amount=0 and 
spark.task.resource.{resourceName}.amount =0, the applicatoin fails to submit.  
it should submit and just not try to allocate those resources. This worked back 
in Spark 3.0 but was broken when the stage level scheduling feature was added.
    
    Here I fixed it by simply removing any task resources from the list if they 
are set to 0.
    
    Note I also fixed a typo in the exception message when no executor 
resources are specified but task resources are.
    
    Note, ideally this is backported to all of the maintenance releases
    
    ### Why are the changes needed?
    
    Fix a bug described above
    
    ### Does this PR introduce _any_ user-facing change?
    
    no api changes
    
    ### How was this patch tested?
    
    Added unit test and then ran manually on standalone and YARN clusters to 
verify overriding the configs now works.
    
    Closes #41703 from tgravescs/fixResource0.
    
    Authored-by: Thomas Graves <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/resource/ResourceProfile.scala      |  2 +-
 .../scala/org/apache/spark/resource/ResourceUtils.scala  |  7 +++++--
 .../test/scala/org/apache/spark/SparkContextSuite.scala  |  2 +-
 .../org/apache/spark/resource/ResourceProfileSuite.scala | 16 ++++++++++++++++
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index afd612433a7..60c541f5b7e 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -215,7 +215,7 @@ class ResourceProfile(
       }
     }
     if (taskResourcesToCheck.nonEmpty) {
-      throw new SparkException("No executor resource configs were not 
specified for the " +
+      throw new SparkException("No executor resource configs were specified 
for the " +
         s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
     }
     val limiting =
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 6e294397a3c..d19f413598b 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -190,12 +190,15 @@ private[spark] object ResourceUtils extends Logging {
   def addTaskResourceRequests(
       sparkConf: SparkConf,
       treqs: TaskResourceRequests): Unit = {
-    listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
+    val nonZeroTaskReqs = listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { 
resourceId =>
       val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
       val amountDouble = settings.getOrElse(AMOUNT,
         throw new SparkException(s"You must specify an amount for 
${resourceId.resourceName}")
       ).toDouble
-      treqs.resource(resourceId.resourceName, amountDouble)
+      (resourceId.resourceName, amountDouble)
+    }.toMap.filter { case (_, amount) => amount > 0.0 }
+    nonZeroTaskReqs.foreach { case (resourceName, amount) =>
+      treqs.resource(resourceName, amount)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 135ecbee09a..93d0d33101a 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -957,7 +957,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("No executor resource configs were not specified for 
the following " +
+    assert(error.contains("No executor resource configs were specified for the 
following " +
       "task configs: gpu"))
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index d07b85847e7..9a2e47e64ea 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -146,6 +146,22 @@ class ResourceProfileSuite extends SparkFunSuite with 
MockitoSugar {
     assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
   }
 
+  test("test default profile task gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "2")
+      .set(TASK_GPU_ID.amountConf, "0")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.taskResources.get("gpu") == None)
+  }
+
+  test("test default profile executor gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "0")
+      .set(TASK_GPU_ID.amountConf, "1")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.executorResources.get("gpu") == None)
+  }
+
   test("maxTasksPerExecutor cpus") {
     val sparkConf = new SparkConf()
       .set(EXECUTOR_CORES, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to