This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2f12bc655875 [SPARK-54610][SDP][DOCS] Improve SDP Programming Guide
2f12bc655875 is described below
commit 2f12bc6558757fae41bf1fccf4f342fa78541e82
Author: Jacek Laskowski <[email protected]>
AuthorDate: Sun Jan 4 08:49:41 2026 -0800
[SPARK-54610][SDP][DOCS] Improve SDP Programming Guide
### What changes were proposed in this pull request?
Improved the SDP Programming Guide
### Why are the changes needed?
Better docs
### Does this PR introduce _any_ user-facing change?
Yes, given that the changes are user docs-related.
### How was this patch tested?
Reviewed locally
### Was this patch authored or co-authored using generative AI tooling?
No (unless auto-completion in IntelliJ IDEA counts)
Closes #53346 from jaceklaskowski/SPARK-54610-sdp-programming-guide.
Authored-by: Jacek Laskowski <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
docs/declarative-pipelines-programming-guide.md | 199 +++++++++++++++---------
1 file changed, 123 insertions(+), 76 deletions(-)
diff --git a/docs/declarative-pipelines-programming-guide.md
b/docs/declarative-pipelines-programming-guide.md
index 0ca04c644f1b..859258a430dd 100644
--- a/docs/declarative-pipelines-programming-guide.md
+++ b/docs/declarative-pipelines-programming-guide.md
@@ -24,7 +24,7 @@ license: |
## What is Spark Declarative Pipelines (SDP)?
-Spark Declarative Pipelines (SDP) is a declarative framework for building
reliable, maintainable, and testable data pipelines on Spark. SDP simplifies
ETL development by allowing you to focus on the transformations you want to
apply to your data, rather than the mechanics of pipeline execution.
+Spark Declarative Pipelines (SDP) is a declarative framework for building
reliable, maintainable, and testable data pipelines on Apache Spark. SDP
simplifies ETL development by allowing you to focus on the transformations you
want to apply to your data, rather than the mechanics of pipeline execution.
SDP is designed for both batch and streaming data processing, supporting
common use cases such as:
- Data ingestion from cloud storage (Amazon S3, Azure ADLS Gen2, Google Cloud
Storage)
@@ -62,10 +62,10 @@ SDP creates the table named `target_table` along with a
flow that reads new data
### Datasets
-A dataset is queryable object that's the output of one of more flows within a
pipeline. Flows in the pipeline can also read from datasets produced in the
pipeline.
+A dataset is a queryable object that's the output of one of more flows within
a pipeline. Flows in the pipeline can also read from datasets produced in the
pipeline.
- **Streaming Table** – a definition of a table and one or more streaming
flows written into it. Streaming tables support incremental processing of data,
allowing you to process only new data as it arrives.
-- **Materialized View** – is a view that is precomputed into a table. A
materialized view always has exactly one batch flow writing to it.
+- **Materialized View** – a view that is precomputed into a table. A
materialized view always has exactly one batch flow writing to it.
- **Temporary View** – a view that is scoped to an execution of the pipeline.
It can be referenced from flows within the pipeline. It's useful for
encapsulating transformations and intermediate logical entities that multiple
other elements of the pipeline depend on.
### Pipelines
@@ -74,11 +74,15 @@ A pipeline is the primary unit of development and execution
in SDP. A pipeline c
### Pipeline Projects
-A pipeline project is a set of source files that contain code that define the
datasets and flows that make up a pipeline. These source files can be `.py` or
`.sql` files.
+A pipeline project is a set of source files that contain code definitions of
the datasets and flows that make up a pipeline. The source files can be `.py`
or `.sql` files.
-A YAML-formatted pipeline spec file contains the top-level configuration for
the pipeline project. It supports the following fields:
-- **libraries** (Required) - Paths where source files can be found.
-- **storage** (Required) – A directory where checkpoints can be stored for
streams within the pipeline.
+It's conventional to name pipeline spec files `spark-pipeline.yml` or
`spark-pipeline.yaml`.
+
+A YAML-formatted pipeline spec file contains the top-level configuration for
the pipeline project with the following fields:
+
+- **name** (Required) - The name of the pipeline project.
+- **libraries** (Required) - The paths with the transformation source files in
SQL or Python.
+- **storage** (Required) – A directory where checkpoints can be stored for
streaming tables within the pipeline.
- **database** (Optional) - The default target database for pipeline outputs.
**schema** can alternatively be used as an alias.
- **catalog** (Optional) - The default target catalog for pipeline outputs.
- **configuration** (Optional) - Map of Spark configuration properties.
@@ -90,175 +94,209 @@ name: my_pipeline
libraries:
- glob:
include: transformations/**
+storage: file:///absolute/path/to/storage/dir
catalog: my_catalog
database: my_db
configuration:
spark.sql.shuffle.partitions: "1000"
```
-It's conventional to name pipeline spec files `spark-pipeline.yml`.
-
The `spark-pipelines init` command, described below, makes it easy to generate
a pipeline project with default configuration and directory structure.
-
## The `spark-pipelines` Command Line Interface
-The `spark-pipelines` command line interface (CLI) is the primary way to
execute a pipeline. It also contains an `init` subcommand for generating a
pipeline project and a `dry-run` subcommand for validating a pipeline.
+The `spark-pipelines` command line interface (CLI) is the primary way to
manage a pipeline.
`spark-pipelines` is built on top of `spark-submit`, meaning that it supports
all cluster managers supported by `spark-submit`. It supports all
`spark-submit` arguments except for `--class`.
### `spark-pipelines init`
-`spark-pipelines init --name my_pipeline` generates a simple pipeline project,
inside a directory named "my_pipeline", including a spec file and example
definitions.
+`spark-pipelines init --name my_pipeline` generates a simple pipeline project,
inside a directory named `my_pipeline`, including a spec file and example
transformation definitions.
### `spark-pipelines run`
-`spark-pipelines run` launches an execution of a pipeline and monitors its
progress until it completes. The `--spec` parameter allows selecting the
pipeline spec file. If not provided, the CLI will look in the current directory
and parent directories for a file named `spark-pipeline.yml` or
`spark-pipeline.yaml`.
+`spark-pipelines run` launches an execution of a pipeline and monitors its
progress until it completes.
+
+The `--spec` parameter allows selecting the pipeline spec file. If not
provided, the CLI will look in the current directory and parent directories for
one of the files:
+
+* `spark-pipeline.yml`
+* `spark-pipeline.yaml`
### `spark-pipelines dry-run`
`spark-pipelines dry-run` launches an execution of a pipeline that doesn't
write or read any data, but catches many kinds of errors that would be caught
if the pipeline were to actually run. E.g.
- Syntax errors – e.g. invalid Python or SQL code
-- Analysis errors – e.g. selecting from a table that doesn't exist or
selecting a column that doesn't exist
+- Analysis errors – e.g. selecting from a table or a column that doesn't exist
- Graph validation errors - e.g. cyclic dependencies
## Programming with SDP in Python
-SDP Python functions are defined in the `pyspark.pipelines` module. Your
pipelines implemented with the Python API must import this module. It's common
to alias the module to `dp` to limit the number of characters you need to type
when using its APIs.
+SDP Python definitions are defined in the `pyspark.pipelines` module.
+
+Your pipelines implemented with the Python API must import this module. It's
recommended to alias the module to `dp`.
```python
from pyspark import pipelines as dp
```
-### Creating a Materialized View with Python
+### Creating a Materialized View in Python
-The `@dp.materialized_view` decorator tells SDP to create a materialized view
based on the results returned by a function that performs a batch read:
+The `@dp.materialized_view` decorator tells SDP to create a materialized view
based on the results of a function that performs a batch read:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.materialized_view
-def basic_mv():
+def basic_mv() -> DataFrame:
return spark.table("samples.nyctaxi.trips")
```
-Optionally, you can specify the table name using the `name` argument:
+The name of the materialized view is derived from the name of the function.
+
+You can specify the name of the materialized view using the `name` argument:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.materialized_view(name="trips_mv")
-def basic_mv():
+def basic_mv() -> DataFrame:
return spark.table("samples.nyctaxi.trips")
```
-### Creating a Temporary View with Python
+### Creating a Temporary View in Python
-The `@dp.temporary_view` decorator tells SDP to create a temporary view based
on the results returned by a function that performs a batch read:
+The `@dp.temporary_view` decorator tells SDP to create a temporary view based
on the results of a function that performs a batch read:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.temporary_view
-def basic_tv():
+def basic_tv() -> DataFrame:
return spark.table("samples.nyctaxi.trips")
```
This temporary view can be read by other queries within the pipeline, but
can't be read outside the scope of the pipeline.
-### Creating a Streaming Table with Python
+### Creating a Streaming Table in Python
-Similarly, you can create a streaming table by using the `@dp.table` decorator
with a function that performs a streaming read:
+You can create a streaming table using the `@dp.table` decorator with a
function that performs a streaming read:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.table
-def basic_st():
+def basic_st() -> DataFrame:
return spark.readStream.table("samples.nyctaxi.trips")
```
-### Loading Data from a Streaming Source
+### Loading Data from Streaming Sources in Python
+
+SDP supports loading data from all the formats supported by Spark Structured
Streaming (`spark.readStream`).
-SDP supports loading data from all formats supported by Spark. For example,
you can create a streaming table whose query reads from a Kafka topic:
+For example, you can create a streaming table whose query reads from a Kafka
topic:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.table
-def ingestion_st():
+def ingestion_st() -> DataFrame:
return (
- spark.readStream.format("kafka")
+ spark.readStream
+ .format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.load()
)
```
-For batch reads:
+### Loading Data from Batch Sources in Python
+
+SDP supports loading data from all the formats supported by Spark SQL
(`spark.read`).
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
@dp.materialized_view
-def batch_mv():
+def batch_mv() -> DataFrame:
return spark.read.format("json").load("/datasets/retail-org/sales_orders")
```
-### Querying Tables Defined in Your Pipeline
+### Querying Tables Defined in a Pipeline in Python
You can reference other tables defined in your pipeline in the same way you'd
reference tables defined outside your pipeline:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
from pyspark.sql.functions import col
@dp.table
-def orders():
+def orders() -> DataFrame:
return (
- spark.readStream.format("kafka")
+ spark.readStream
+ .format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.load()
)
@dp.materialized_view
-def customers():
- return spark.read.format("csv").option("header",
True).load("/datasets/retail-org/customers")
+def customers() -> DataFrame:
+ return (
+ spark.read
+ .format("csv")
+ .option("header", True)
+ .load("/datasets/retail-org/customers")
+ )
@dp.materialized_view
-def customer_orders():
- return (spark.table("orders")
- .join(spark.table("customers"), "customer_id")
- .select("customer_id",
- "order_number",
- "state",
-
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
+def customer_orders() -> DataFrame:
+ return (
+ spark.table("orders")
+ .join(
+ spark.table("customers"), "customer_id")
+ .select(
+ "customer_id",
+ "order_number",
+ "state",
+ col("order_datetime").cast("date").alias("order_date"),
+ )
)
)
@dp.materialized_view
-def daily_orders_by_state():
- return (spark.table("customer_orders")
+def daily_orders_by_state() -> DataFrame:
+ return (
+ spark.table("customer_orders")
.groupBy("state", "order_date")
- .count().withColumnRenamed("count", "order_count")
+ .count()
+ .withColumnRenamed("count", "order_count")
)
```
-### Creating Tables in a For Loop
+### Creating Tables in For Loop in Python
You can use Python `for` loops to create multiple tables programmatically:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
-def customer_orders():
+def customer_orders() -> DataFrame:
orders = spark.table("samples.tpch.orders")
customer = spark.table("samples.tpch.customer")
- return (orders.join(customer, orders.o_custkey == customer.c_custkey)
+ return (
+ orders
+ .join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
@@ -267,19 +305,22 @@ def customer_orders():
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
- col("o_orderdate").alias("orderdate"))
+ col("o_orderdate").alias("orderdate"),
+ )
)
@dp.temporary_view()
-def nation_region():
+def nation_region() -> DataFrame:
nation = spark.table("samples.tpch.nation")
region = spark.table("samples.tpch.region")
- return (nation.join(region, nation.n_regionkey == region.r_regionkey)
+ return (
+ nation
+ .join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
- col("n_nationkey").alias("nationkey")
+ col("n_nationkey").alias("nationkey"),
)
)
@@ -289,11 +330,13 @@ region_list =
spark.table("samples.tpch.region").select(collect_list("r_name")).
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
- def regional_customer_orders(region_filter=region):
+ def regional_customer_orders(region_filter=region) -> DataFrame:
customer_orders = spark.table("customer_orders")
nation_region = spark.table("nation_region")
- return (customer_orders.join(nation_region, customer_orders.nationkey
== nation_region.nationkey)
+ return (
+ customer_orders
+ .join(nation_region, customer_orders.nationkey ==
nation_region.nationkey)
.select(
col("custkey"),
col("name"),
@@ -303,35 +346,37 @@ for region in region_list:
col("orderkey"),
col("orderstatus"),
col("totalprice"),
- col("orderdate")
- ).filter(f"region = '{region_filter}'")
+ col("orderdate"),
+ )
+ .filter(f"region = '{region_filter}'")
)
```
-### Using Multiple Flows to Write to a Single Target
+### Using Multiple Flows to Write to a Single Target in Python
-You can create multiple flows that append data to the same target:
+You can create multiple flows that append data to the same dataset:
```python
from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
# create a streaming table
dp.create_streaming_table("customers_us")
-# add the first append flow
+# define the first append flow
@dp.append_flow(target = "customers_us")
-def append1():
+def append_customers_us_west() -> DataFrame:
return spark.readStream.table("customers_us_west")
-# add the second append flow
+# define the second append flow
@dp.append_flow(target = "customers_us")
-def append2():
+def append_customers_us_east() -> DataFrame:
return spark.readStream.table("customers_us_east")
```
## Programming with SDP in SQL
-### Creating a Materialized View with SQL
+### Creating a Materialized View in SQL
The basic syntax for creating a materialized view with SQL is:
@@ -340,7 +385,7 @@ CREATE MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
```
-### Creating a Temporary View with SQL
+### Creating a Temporary View in SQL
The basic syntax for creating a temporary view with SQL is:
@@ -349,7 +394,7 @@ CREATE TEMPORARY VIEW basic_tv
AS SELECT * FROM samples.nyctaxi.trips;
```
-### Creating a Streaming Table with SQL
+### Creating a Streaming Table in SQL
When creating a streaming table, use the `STREAM` keyword to indicate
streaming semantics for the source:
@@ -358,7 +403,7 @@ CREATE STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
```
-### Querying Tables Defined in Your Pipeline
+### Querying Tables Defined in a Pipeline in SQL
You can reference other tables defined in your pipeline:
@@ -385,7 +430,7 @@ FROM customer_orders
GROUP BY state, order_date;
```
-### Using Multiple Flows to Write to a Single Target
+### Using Multiple Flows to Write to a Single Target in SQL
You can create multiple flows that append data to the same target:
@@ -393,13 +438,13 @@ You can create multiple flows that append data to the
same target:
-- create a streaming table
CREATE STREAMING TABLE customers_us;
--- add the first append flow
-CREATE FLOW append1
+-- define the first append flow
+CREATE FLOW append_customers_us_west
AS INSERT INTO customers_us
SELECT * FROM STREAM(customers_us_west);
--- add the second append flow
-CREATE FLOW append2
+-- define the second append flow
+CREATE FLOW append_customers_us_east
AS INSERT INTO customers_us
SELECT * FROM STREAM(customers_us_east);
```
@@ -409,12 +454,15 @@ SELECT * FROM STREAM(customers_us_east);
### Python Considerations
- SDP evaluates the code that defines a pipeline multiple times during
planning and pipeline runs. Python functions that define datasets should
include only the code required to define the table or view.
-- The function used to define a dataset must return a Spark DataFrame.
+- The function used to define a dataset must return a `pyspark.sql.DataFrame`.
- Never use methods that save or write to files or tables as part of your SDP
dataset code.
+- When using the `for` loop pattern to define datasets in Python, ensure that
the list of values passed to the `for` loop is always additive.
+
+Examples of Spark SQL operations that should never be used in SDP code:
-Examples of Apache Spark operations that should never be used in SDP code:
- `collect()`
- `count()`
+- `pivot()`
- `toPandas()`
- `save()`
- `saveAsTable()`
@@ -424,4 +472,3 @@ Examples of Apache Spark operations that should never be
used in SDP code:
### SQL Considerations
- The `PIVOT` clause is not supported in SDP SQL.
-- When using the `for` loop pattern to define datasets in Python, ensure that
the list of values passed to the `for` loop is always additive.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]