This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b6993cbbcaa0 [SPARK-53516][SDP] Fix `spark.api.mode` arg process in 
SparkPipelines
b6993cbbcaa0 is described below

commit b6993cbbcaa0b175019115c665fc081c3d65a7ed
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Tue Sep 23 08:58:53 2025 -0700

    [SPARK-53516][SDP] Fix `spark.api.mode` arg process in SparkPipelines
    
    ### What changes were proposed in this pull request?
    
    This PR fixes two issues:
    
    - Trim the value of `spark.api.mode` before evaluation
    - The value of `spark.api.mode` should be case insensitive
    - Support both `-c spark.api.mode=xxx` and `--conf spark.api.mode=xxx`
    - Avoid duplicated `--conf spark.api.mode=connect` args in the final 
generated commands
    
    ### Why are the changes needed?
    
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, SDP is an unreleased feature.
    
    ### How was this patch tested?
    
    UT is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52261 from pan3793/SPARK-53516.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Sandy Ryza <sandy.r...@databricks.com>
---
 .../org/apache/spark/deploy/SparkPipelines.scala   | 23 ++++----
 .../apache/spark/deploy/SparkPipelinesSuite.scala  | 66 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
index 8c96598e7a9d..713937cadabf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.deploy
 
 import java.util
+import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkUserAppException
 import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkLauncher.SPARK_API_MODE
 import org.apache.spark.launcher.SparkSubmitArgumentsParser
 import org.apache.spark.util.SparkExitCode
 
@@ -63,16 +65,17 @@ object SparkPipelines extends Logging {
         if (opt == "--remote") {
           remote = value
         } else if (opt == "--class") {
-          logInfo("--class argument not supported.")
-          throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
-        } else if (opt == "--conf" &&
-          value.startsWith("spark.api.mode=") &&
-          value != "spark.api.mode=connect") {
-          logInfo(
-            "--spark.api.mode must be 'connect'. " +
-            "Declarative Pipelines currently only supports Spark Connect."
-          )
+          logError("--class argument not supported.")
           throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
+        } else if ((opt == "--conf" || opt == "-c") && 
value.startsWith(s"$SPARK_API_MODE=")) {
+          val apiMode = value.stripPrefix(s"$SPARK_API_MODE=").trim
+          if (apiMode.toLowerCase(Locale.ROOT) != "connect") {
+            logError(
+              s"$SPARK_API_MODE must be 'connect' (was '$apiMode'). " +
+                "Declarative Pipelines currently only supports Spark Connect."
+            )
+            throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
+          }
         } else if (Seq("--name", "-h", "--help").contains(opt)) {
           pipelinesArgs += opt
           if (value != null && value.nonEmpty) {
@@ -99,7 +102,7 @@ object SparkPipelines extends Logging {
     }
 
     sparkSubmitArgs += "--conf"
-    sparkSubmitArgs += "spark.api.mode=connect"
+    sparkSubmitArgs += s"$SPARK_API_MODE=connect"
     sparkSubmitArgs += "--remote"
     sparkSubmitArgs += remote
     (sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
index 2d6a1e083604..55f59d7c856d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
@@ -117,6 +117,72 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils 
with BeforeAndAfterEach {
     }
   }
 
+  test("spark.api.mode arg") {
+    var args = Array("--conf", "spark.api.mode=classic")
+    intercept[SparkUserAppException] {
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+    }
+    args = Array("-c", "spark.api.mode=classic")
+    intercept[SparkUserAppException] {
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+    }
+    args = Array("--conf", "spark.api.mode=CONNECT")
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py"
+        )
+    )
+    args = Array("--conf", "spark.api.mode=CoNNect")
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py"
+        )
+    )
+    args = Array("--conf", "spark.api.mode=connect")
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py"
+        )
+    )
+    args = Array("--conf", "spark.api.mode= connect")
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py"
+        )
+    )
+    args = Array("-c", "spark.api.mode=connect")
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py"
+        )
+    )
+  }
+
   test("name arg") {
     val args = Array(
       "init",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to