This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c4ffb7b31169 [SPARK-52431][SDP] Finishing touches on Declarative
Pipelines runner
c4ffb7b31169 is described below
commit c4ffb7b3116900f21078f3fcf03cd8fa37f2fd1c
Author: Sandy Ryza <[email protected]>
AuthorDate: Fri Jun 13 09:39:19 2025 -0700
[SPARK-52431][SDP] Finishing touches on Declarative Pipelines runner
### What changes were proposed in this pull request?
Adds finishing touches to the Declarative Pipelines CLI entrypoint.
- Makes it behave like a standard spark-submitted application.
- Enables running `spark-pipelines` against common cluster managers.
- Enables running `spark-pipelines` without the remote flag.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
- Added unit tests
- Ran the `spark-pipelines` CLI with various combinations
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51165 from sryza/entrypoint-ft.
Lead-authored-by: Sandy Ryza <[email protected]>
Co-authored-by: Sandy Ryza <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
bin/spark-pipelines | 2 +-
.../org/apache/spark/deploy/SparkPipelines.scala | 106 ++++++++++++++++
.../apache/spark/deploy/SparkPipelinesSuite.scala | 137 +++++++++++++++++++++
python/pyspark/pipelines/cli.py | 11 +-
4 files changed, 248 insertions(+), 8 deletions(-)
diff --git a/bin/spark-pipelines b/bin/spark-pipelines
index 52baeeafab08..2174df7bed69 100755
--- a/bin/spark-pipelines
+++ b/bin/spark-pipelines
@@ -30,4 +30,4 @@ fi
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
-$PYSPARK_PYTHON "${SPARK_HOME}"/python/pyspark/pipelines/cli.py "$@"
+exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines
"$@"
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
new file mode 100644
index 000000000000..edad8901a030
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkUserAppException
+import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkSubmitArgumentsParser
+
+/**
+ * Outer implementation of the spark-pipelines command line interface.
Responsible for routing
+ * spark-submit args to spark-submit, and pipeline-specific args to the inner
Python CLI
+ * implementation that loads the user code and submits it to the backend.
+ */
+object SparkPipelines extends Logging {
+ def main(args: Array[String]): Unit = {
+ val sparkHome = sys.env("SPARK_HOME")
+ SparkSubmit.main(constructSparkSubmitArgs(args, sparkHome).toArray)
+ }
+
+ protected[deploy] def constructSparkSubmitArgs(
+ args: Array[String],
+ sparkHome: String): Seq[String] = {
+ val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args)
+ val pipelinesCliFile = s"$sparkHome/python/pyspark/pipelines/cli.py"
+ (sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs)
+ }
+
+ /**
+ * Split the arguments into spark-submit args (--master, --remote, etc.) and
pipeline args
+ * (run, --spec, etc.).
+ */
+ private def splitArgs(args: Array[String]): (Seq[String], Seq[String]) = {
+ val sparkSubmitArgs = new ArrayBuffer[String]()
+ val pipelinesArgs = new ArrayBuffer[String]()
+ var remote = "local"
+
+ new SparkSubmitArgumentsParser() {
+ parse(util.Arrays.asList(args: _*))
+
+ override protected def handle(opt: String, value: String): Boolean = {
+ if (opt == "--remote") {
+ remote = value
+ } else if (opt == "--class") {
+ logInfo("--class argument not supported.")
+ throw SparkUserAppException(1)
+ } else if (opt == "--conf" &&
+ value.startsWith("spark.api.mode=") &&
+ value != "spark.api.mode=connect") {
+ logInfo(
+ "--spark.api.mode must be 'connect'. " +
+ "Declarative Pipelines currently only supports Spark Connect."
+ )
+ throw SparkUserAppException(1)
+ } else if (Seq("--name", "-h", "--help").contains(opt)) {
+ pipelinesArgs += opt
+ if (value != null && value.nonEmpty) {
+ pipelinesArgs += value
+ }
+ } else {
+ sparkSubmitArgs += opt
+ if (value != null) {
+ sparkSubmitArgs += value
+ }
+ }
+
+ true
+ }
+
+ override protected def handleExtraArgs(extra: util.List[String]): Unit =
{
+ pipelinesArgs.appendAll(extra.asScala)
+ }
+
+ override protected def handleUnknown(opt: String): Boolean = {
+ pipelinesArgs += opt
+ true
+ }
+ }
+
+ sparkSubmitArgs += "--conf"
+ sparkSubmitArgs += "spark.api.mode=connect"
+ sparkSubmitArgs += "--remote"
+ sparkSubmitArgs += remote
+ (sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)
+ }
+
+}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
new file mode 100644
index 000000000000..a482eaa42c35
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.must.Matchers
+
+import org.apache.spark.SparkUserAppException
+
+class SparkPipelinesSuite extends SparkSubmitTestUtils with Matchers with
BeforeAndAfterEach {
+ test("only spark submit args") {
+ val args = Array(
+ "--remote",
+ "local[2]",
+ "--deploy-mode",
+ "client",
+ "--supervise",
+ "--conf",
+ "spark.conf1=2",
+ "--conf",
+ "spark.conf2=3"
+ )
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--deploy-mode",
+ "client",
+ "--supervise",
+ "--conf",
+ "spark.conf1=2",
+ "--conf",
+ "spark.conf2=3",
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local[2]",
+ "abc/python/pyspark/pipelines/cli.py"
+ )
+ )
+ }
+
+ test("only pipelines args") {
+ val args = Array(
+ "run",
+ "--spec",
+ "pipeline.yml"
+ )
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py",
+ "run",
+ "--spec",
+ "pipeline.yml"
+ )
+ )
+ }
+
+ test("spark-submit and pipelines args") {
+ val args = Array(
+ "--remote",
+ "local[2]",
+ "run",
+ "--supervise",
+ "--spec",
+ "pipeline.yml",
+ "--conf",
+ "spark.conf2=3"
+ )
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--supervise",
+ "--conf",
+ "spark.conf2=3",
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local[2]",
+ "abc/python/pyspark/pipelines/cli.py",
+ "run",
+ "--spec",
+ "pipeline.yml"
+ )
+ )
+ }
+
+ test("class arg prohibited") {
+ val args = Array(
+ "--class",
+ "org.apache.spark.deploy.SparkPipelines"
+ )
+ intercept[SparkUserAppException] {
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+ }
+ }
+
+ test("name arg") {
+ val args = Array(
+ "init",
+ "--name",
+ "myproject"
+ )
+ assert(
+ SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+ Seq(
+ "--conf",
+ "spark.api.mode=connect",
+ "--remote",
+ "local",
+ "abc/python/pyspark/pipelines/cli.py",
+ "init",
+ "--name",
+ "myproject"
+ )
+ )
+ }
+}
diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py
index 824c2a5fff16..f4c053858d14 100644
--- a/python/pyspark/pipelines/cli.py
+++ b/python/pyspark/pipelines/cli.py
@@ -205,17 +205,17 @@ def change_dir(path: Path) -> Generator[None, None, None]:
os.chdir(prev)
-def run(spec_path: Path, remote: str) -> None:
+def run(spec_path: Path) -> None:
"""Run the pipeline defined with the given spec."""
log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...")
spec = load_pipeline_spec(spec_path)
log_with_curr_timestamp("Creating Spark session...")
- spark_builder = SparkSession.builder.remote(remote)
+ spark_builder = SparkSession.builder
for key, value in spec.configuration.items():
spark_builder = spark_builder.config(key, value)
- spark = spark_builder.create()
+ spark = spark_builder.getOrCreate()
log_with_curr_timestamp("Creating dataflow graph...")
dataflow_graph_id = create_dataflow_graph(
@@ -244,9 +244,6 @@ if __name__ == "__main__":
# "run" subcommand
run_parser = subparsers.add_parser("run", help="Run a pipeline.")
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
- run_parser.add_argument(
- "--remote", help="The Spark Connect remote to connect to.",
required=True
- )
# "init" subcommand
init_parser = subparsers.add_parser(
@@ -274,6 +271,6 @@ if __name__ == "__main__":
else:
spec_path = find_pipeline_spec(Path.cwd())
- run(spec_path=spec_path, remote=args.remote)
+ run(spec_path=spec_path)
elif args.command == "init":
init(args.name)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]