This is an automated email from the ASF dual-hosted git repository. sandy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b6993cbbcaa0 [SPARK-53516][SDP] Fix `spark.api.mode` arg process in SparkPipelines b6993cbbcaa0 is described below commit b6993cbbcaa0b175019115c665fc081c3d65a7ed Author: Cheng Pan <cheng...@apache.org> AuthorDate: Tue Sep 23 08:58:53 2025 -0700 [SPARK-53516][SDP] Fix `spark.api.mode` arg process in SparkPipelines ### What changes were proposed in this pull request? This PR fixes two issues: - Trim the value of `spark.api.mode` before evaluation - The value of `spark.api.mode` should be case insensitive - Support both `-c spark.api.mode=xxx` and `--conf spark.api.mode=xxx` - Avoid duplicated `--conf spark.api.mode=connect` args in the final generated commands ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No, SDP is an unreleased feature. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52261 from pan3793/SPARK-53516. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Sandy Ryza <sandy.r...@databricks.com> --- .../org/apache/spark/deploy/SparkPipelines.scala | 23 ++++---- .../apache/spark/deploy/SparkPipelinesSuite.scala | 66 ++++++++++++++++++++++ 2 files changed, 79 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala index 8c96598e7a9d..713937cadabf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala @@ -18,12 +18,14 @@ package org.apache.spark.deploy import java.util +import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.SparkUserAppException import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkLauncher.SPARK_API_MODE import org.apache.spark.launcher.SparkSubmitArgumentsParser import org.apache.spark.util.SparkExitCode @@ -63,16 +65,17 @@ object SparkPipelines extends Logging { if (opt == "--remote") { remote = value } else if (opt == "--class") { - logInfo("--class argument not supported.") - throw SparkUserAppException(SparkExitCode.EXIT_FAILURE) - } else if (opt == "--conf" && - value.startsWith("spark.api.mode=") && - value != "spark.api.mode=connect") { - logInfo( - "--spark.api.mode must be 'connect'. " + - "Declarative Pipelines currently only supports Spark Connect." - ) + logError("--class argument not supported.") throw SparkUserAppException(SparkExitCode.EXIT_FAILURE) + } else if ((opt == "--conf" || opt == "-c") && value.startsWith(s"$SPARK_API_MODE=")) { + val apiMode = value.stripPrefix(s"$SPARK_API_MODE=").trim + if (apiMode.toLowerCase(Locale.ROOT) != "connect") { + logError( + s"$SPARK_API_MODE must be 'connect' (was '$apiMode'). " + + "Declarative Pipelines currently only supports Spark Connect." + ) + throw SparkUserAppException(SparkExitCode.EXIT_FAILURE) + } } else if (Seq("--name", "-h", "--help").contains(opt)) { pipelinesArgs += opt if (value != null && value.nonEmpty) { @@ -99,7 +102,7 @@ object SparkPipelines extends Logging { } sparkSubmitArgs += "--conf" - sparkSubmitArgs += "spark.api.mode=connect" + sparkSubmitArgs += s"$SPARK_API_MODE=connect" sparkSubmitArgs += "--remote" sparkSubmitArgs += remote (sparkSubmitArgs.toSeq, pipelinesArgs.toSeq) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala index 2d6a1e083604..55f59d7c856d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala @@ -117,6 +117,72 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach { } } + test("spark.api.mode arg") { + var args = Array("--conf", "spark.api.mode=classic") + intercept[SparkUserAppException] { + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") + } + args = Array("-c", "spark.api.mode=classic") + intercept[SparkUserAppException] { + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") + } + args = Array("--conf", "spark.api.mode=CONNECT") + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + args = Array("--conf", "spark.api.mode=CoNNect") + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + args = Array("--conf", "spark.api.mode=connect") + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + args = Array("--conf", "spark.api.mode= connect") + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + args = Array("-c", "spark.api.mode=connect") + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + } + test("name arg") { val args = Array( "init", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org