This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f12bc655875 [SPARK-54610][SDP][DOCS] Improve SDP Programming Guide
2f12bc655875 is described below

commit 2f12bc6558757fae41bf1fccf4f342fa78541e82
Author: Jacek Laskowski <[email protected]>
AuthorDate: Sun Jan 4 08:49:41 2026 -0800

    [SPARK-54610][SDP][DOCS] Improve SDP Programming Guide
    
    ### What changes were proposed in this pull request?
    
    Improved the SDP Programming Guide
    
    ### Why are the changes needed?
    
    Better docs
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, given that the changes are user docs-related.
    
    ### How was this patch tested?
    
    Reviewed locally
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No (unless auto-completion in IntelliJ IDEA counts)
    
    Closes #53346 from jaceklaskowski/SPARK-54610-sdp-programming-guide.
    
    Authored-by: Jacek Laskowski <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 docs/declarative-pipelines-programming-guide.md | 199 +++++++++++++++---------
 1 file changed, 123 insertions(+), 76 deletions(-)

diff --git a/docs/declarative-pipelines-programming-guide.md 
b/docs/declarative-pipelines-programming-guide.md
index 0ca04c644f1b..859258a430dd 100644
--- a/docs/declarative-pipelines-programming-guide.md
+++ b/docs/declarative-pipelines-programming-guide.md
@@ -24,7 +24,7 @@ license: |
 
 ## What is Spark Declarative Pipelines (SDP)?
 
-Spark Declarative Pipelines (SDP) is a declarative framework for building 
reliable, maintainable, and testable data pipelines on Spark. SDP simplifies 
ETL development by allowing you to focus on the transformations you want to 
apply to your data, rather than the mechanics of pipeline execution.
+Spark Declarative Pipelines (SDP) is a declarative framework for building 
reliable, maintainable, and testable data pipelines on Apache Spark. SDP 
simplifies ETL development by allowing you to focus on the transformations you 
want to apply to your data, rather than the mechanics of pipeline execution.
 
 SDP is designed for both batch and streaming data processing, supporting 
common use cases such as:
 - Data ingestion from cloud storage (Amazon S3, Azure ADLS Gen2, Google Cloud 
Storage)
@@ -62,10 +62,10 @@ SDP creates the table named `target_table` along with a 
flow that reads new data
 
 ### Datasets
 
-A dataset is queryable object that's the output of one of more flows within a 
pipeline. Flows in the pipeline can also read from datasets produced in the 
pipeline.
+A dataset is a queryable object that's the output of one of more flows within 
a pipeline. Flows in the pipeline can also read from datasets produced in the 
pipeline.
 
 - **Streaming Table** – a definition of a table and one or more streaming 
flows written into it. Streaming tables support incremental processing of data, 
allowing you to process only new data as it arrives.
-- **Materialized View** – is a view that is precomputed into a table. A 
materialized view always has exactly one batch flow writing to it.
+- **Materialized View** – a view that is precomputed into a table. A 
materialized view always has exactly one batch flow writing to it.
 - **Temporary View** – a view that is scoped to an execution of the pipeline. 
It can be referenced from flows within the pipeline. It's useful for 
encapsulating transformations and intermediate logical entities that multiple 
other elements of the pipeline depend on.
 
 ### Pipelines
@@ -74,11 +74,15 @@ A pipeline is the primary unit of development and execution 
in SDP. A pipeline c
 
 ### Pipeline Projects
 
-A pipeline project is a set of source files that contain code that define the 
datasets and flows that make up a pipeline. These source files can be `.py` or 
`.sql` files.
+A pipeline project is a set of source files that contain code definitions of 
the datasets and flows that make up a pipeline. The source files can be `.py` 
or `.sql` files.
 
-A YAML-formatted pipeline spec file contains the top-level configuration for 
the pipeline project. It supports the following fields:
-- **libraries** (Required) - Paths where source files can be found.
-- **storage** (Required) – A directory where checkpoints can be stored for 
streams within the pipeline.
+It's conventional to name pipeline spec files `spark-pipeline.yml` or 
`spark-pipeline.yaml`.
+
+A YAML-formatted pipeline spec file contains the top-level configuration for 
the pipeline project with the following fields:
+
+- **name** (Required) - The name of the pipeline project.
+- **libraries** (Required) - The paths with the transformation source files in 
SQL or Python.
+- **storage** (Required) – A directory where checkpoints can be stored for 
streaming tables within the pipeline.
 - **database** (Optional) - The default target database for pipeline outputs. 
**schema** can alternatively be used as an alias.
 - **catalog** (Optional) - The default target catalog for pipeline outputs.
 - **configuration** (Optional) - Map of Spark configuration properties.
@@ -90,175 +94,209 @@ name: my_pipeline
 libraries:
   - glob:
       include: transformations/**
+storage: file:///absolute/path/to/storage/dir
 catalog: my_catalog
 database: my_db
 configuration:
   spark.sql.shuffle.partitions: "1000"
 ```
 
-It's conventional to name pipeline spec files `spark-pipeline.yml`.
-
 The `spark-pipelines init` command, described below, makes it easy to generate 
a pipeline project with default configuration and directory structure.
 
-
 ## The `spark-pipelines` Command Line Interface
 
-The `spark-pipelines` command line interface (CLI) is the primary way to 
execute a pipeline. It also contains an `init` subcommand for generating a 
pipeline project and a `dry-run` subcommand for validating a pipeline.
+The `spark-pipelines` command line interface (CLI) is the primary way to 
manage a pipeline.
 
 `spark-pipelines` is built on top of `spark-submit`, meaning that it supports 
all cluster managers supported by `spark-submit`. It supports all 
`spark-submit` arguments except for `--class`.
 
 ### `spark-pipelines init`
 
-`spark-pipelines init --name my_pipeline` generates a simple pipeline project, 
inside a directory named "my_pipeline", including a spec file and example 
definitions.
+`spark-pipelines init --name my_pipeline` generates a simple pipeline project, 
inside a directory named `my_pipeline`, including a spec file and example 
transformation definitions.
 
 ### `spark-pipelines run`
 
-`spark-pipelines run` launches an execution of a pipeline and monitors its 
progress until it completes. The `--spec` parameter allows selecting the 
pipeline spec file. If not provided, the CLI will look in the current directory 
and parent directories for a file named `spark-pipeline.yml` or 
`spark-pipeline.yaml`.
+`spark-pipelines run` launches an execution of a pipeline and monitors its 
progress until it completes.
+
+The `--spec` parameter allows selecting the pipeline spec file. If not 
provided, the CLI will look in the current directory and parent directories for 
one of the files:
+
+* `spark-pipeline.yml`
+* `spark-pipeline.yaml`
 
 ### `spark-pipelines dry-run`
 
 `spark-pipelines dry-run` launches an execution of a pipeline that doesn't 
write or read any data, but catches many kinds of errors that would be caught 
if the pipeline were to actually run. E.g.
 - Syntax errors – e.g. invalid Python or SQL code
-- Analysis errors – e.g. selecting from a table that doesn't exist or 
selecting a column that doesn't exist
+- Analysis errors – e.g. selecting from a table or a column that doesn't exist
 - Graph validation errors - e.g. cyclic dependencies
 
 ## Programming with SDP in Python
 
-SDP Python functions are defined in the `pyspark.pipelines` module. Your 
pipelines implemented with the Python API must import this module. It's common 
to alias the module to `dp` to limit the number of characters you need to type 
when using its APIs.
+SDP Python definitions are defined in the `pyspark.pipelines` module.
+
+Your pipelines implemented with the Python API must import this module. It's 
recommended to alias the module to `dp`.
 
 ```python
 from pyspark import pipelines as dp
 ```
 
-### Creating a Materialized View with Python
+### Creating a Materialized View in Python
 
-The `@dp.materialized_view` decorator tells SDP to create a materialized view 
based on the results returned by a function that performs a batch read:
+The `@dp.materialized_view` decorator tells SDP to create a materialized view 
based on the results of a function that performs a batch read:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.materialized_view
-def basic_mv():
+def basic_mv() -> DataFrame:
     return spark.table("samples.nyctaxi.trips")
 ```
 
-Optionally, you can specify the table name using the `name` argument:
+The name of the materialized view is derived from the name of the function.
+
+You can specify the name of the materialized view using the `name` argument:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.materialized_view(name="trips_mv")
-def basic_mv():
+def basic_mv() -> DataFrame:
     return spark.table("samples.nyctaxi.trips")
 ```
 
-### Creating a Temporary View with Python
+### Creating a Temporary View in Python
 
-The `@dp.temporary_view` decorator tells SDP to create a temporary view based 
on the results returned by a function that performs a batch read:
+The `@dp.temporary_view` decorator tells SDP to create a temporary view based 
on the results of a function that performs a batch read:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.temporary_view
-def basic_tv():
+def basic_tv() -> DataFrame:
     return spark.table("samples.nyctaxi.trips")
 ```
 
 This temporary view can be read by other queries within the pipeline, but 
can't be read outside the scope of the pipeline.
 
-### Creating a Streaming Table with Python
+### Creating a Streaming Table in Python
 
-Similarly, you can create a streaming table by using the `@dp.table` decorator 
with a function that performs a streaming read:
+You can create a streaming table using the `@dp.table` decorator with a 
function that performs a streaming read:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.table
-def basic_st():
+def basic_st() -> DataFrame:
     return spark.readStream.table("samples.nyctaxi.trips")
 ```
 
-### Loading Data from a Streaming Source
+### Loading Data from Streaming Sources in Python
+
+SDP supports loading data from all the formats supported by Spark Structured 
Streaming (`spark.readStream`).
 
-SDP supports loading data from all formats supported by Spark. For example, 
you can create a streaming table whose query reads from a Kafka topic:
+For example, you can create a streaming table whose query reads from a Kafka 
topic:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.table
-def ingestion_st():
+def ingestion_st() -> DataFrame:
     return (
-        spark.readStream.format("kafka")
+        spark.readStream
+        .format("kafka")
         .option("kafka.bootstrap.servers", "localhost:9092")
         .option("subscribe", "orders")
         .load()
     )
 ```
 
-For batch reads:
+### Loading Data from Batch Sources in Python
+
+SDP supports loading data from all the formats supported by Spark SQL 
(`spark.read`).
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 @dp.materialized_view
-def batch_mv():
+def batch_mv() -> DataFrame:
     return spark.read.format("json").load("/datasets/retail-org/sales_orders")
 ```
 
-### Querying Tables Defined in Your Pipeline
+### Querying Tables Defined in a Pipeline in Python
 
 You can reference other tables defined in your pipeline in the same way you'd 
reference tables defined outside your pipeline:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 from pyspark.sql.functions import col
 
 @dp.table
-def orders():
+def orders() -> DataFrame:
     return (
-        spark.readStream.format("kafka")
+        spark.readStream
+        .format("kafka")
         .option("kafka.bootstrap.servers", "localhost:9092")
         .option("subscribe", "orders")
         .load()
     )
 
 @dp.materialized_view
-def customers():
-    return spark.read.format("csv").option("header", 
True).load("/datasets/retail-org/customers")
+def customers() -> DataFrame:
+    return (
+        spark.read
+        .format("csv")
+        .option("header", True)
+        .load("/datasets/retail-org/customers")
+    )
 
 @dp.materialized_view
-def customer_orders():
-    return (spark.table("orders")
-        .join(spark.table("customers"), "customer_id")
-        .select("customer_id",
-            "order_number",
-            "state",
-            
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
+def customer_orders() -> DataFrame:
+    return (
+        spark.table("orders")
+        .join(
+            spark.table("customers"), "customer_id")
+            .select(
+                "customer_id",
+                "order_number",
+                "state",
+                col("order_datetime").cast("date").alias("order_date"),
+            )
         )
     )
 
 @dp.materialized_view
-def daily_orders_by_state():
-    return (spark.table("customer_orders")
+def daily_orders_by_state() -> DataFrame:
+    return (
+        spark.table("customer_orders")
         .groupBy("state", "order_date")
-        .count().withColumnRenamed("count", "order_count")
+        .count()
+        .withColumnRenamed("count", "order_count")
     )
 ```
 
-### Creating Tables in a For Loop
+### Creating Tables in For Loop in Python
 
 You can use Python `for` loops to create multiple tables programmatically:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 from pyspark.sql.functions import collect_list, col
 
 @dp.temporary_view()
-def customer_orders():
+def customer_orders() -> DataFrame:
     orders = spark.table("samples.tpch.orders")
     customer = spark.table("samples.tpch.customer")
 
-    return (orders.join(customer, orders.o_custkey == customer.c_custkey)
+    return (
+        orders
+        .join(customer, orders.o_custkey == customer.c_custkey)
         .select(
             col("c_custkey").alias("custkey"),
             col("c_name").alias("name"),
@@ -267,19 +305,22 @@ def customer_orders():
             col("o_orderkey").alias("orderkey"),
             col("o_orderstatus").alias("orderstatus"),
             col("o_totalprice").alias("totalprice"),
-            col("o_orderdate").alias("orderdate"))
+            col("o_orderdate").alias("orderdate"),
+        )
     )
 
 @dp.temporary_view()
-def nation_region():
+def nation_region() -> DataFrame:
     nation = spark.table("samples.tpch.nation")
     region = spark.table("samples.tpch.region")
 
-    return (nation.join(region, nation.n_regionkey == region.r_regionkey)
+    return (
+        nation
+        .join(region, nation.n_regionkey == region.r_regionkey)
         .select(
             col("n_name").alias("nation"),
             col("r_name").alias("region"),
-            col("n_nationkey").alias("nationkey")
+            col("n_nationkey").alias("nationkey"),
         )
     )
 
@@ -289,11 +330,13 @@ region_list = 
spark.table("samples.tpch.region").select(collect_list("r_name")).
 # Iterate through region names to create new region-specific materialized views
 for region in region_list:
     @dp.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
-    def regional_customer_orders(region_filter=region):
+    def regional_customer_orders(region_filter=region) -> DataFrame:
         customer_orders = spark.table("customer_orders")
         nation_region = spark.table("nation_region")
 
-        return (customer_orders.join(nation_region, customer_orders.nationkey 
== nation_region.nationkey)
+        return (
+            customer_orders
+            .join(nation_region, customer_orders.nationkey == 
nation_region.nationkey)
             .select(
                 col("custkey"),
                 col("name"),
@@ -303,35 +346,37 @@ for region in region_list:
                 col("orderkey"),
                 col("orderstatus"),
                 col("totalprice"),
-                col("orderdate")
-            ).filter(f"region = '{region_filter}'")
+                col("orderdate"),
+            )
+            .filter(f"region = '{region_filter}'")
         )
 ```
 
-### Using Multiple Flows to Write to a Single Target
+### Using Multiple Flows to Write to a Single Target in Python
 
-You can create multiple flows that append data to the same target:
+You can create multiple flows that append data to the same dataset:
 
 ```python
 from pyspark import pipelines as dp
+from pyspark.sql import DataFrame
 
 # create a streaming table
 dp.create_streaming_table("customers_us")
 
-# add the first append flow
+# define the first append flow
 @dp.append_flow(target = "customers_us")
-def append1():
+def append_customers_us_west() -> DataFrame:
     return spark.readStream.table("customers_us_west")
 
-# add the second append flow
+# define the second append flow
 @dp.append_flow(target = "customers_us")
-def append2():
+def append_customers_us_east() -> DataFrame:
     return spark.readStream.table("customers_us_east")
 ```
 
 ## Programming with SDP in SQL
 
-### Creating a Materialized View with SQL
+### Creating a Materialized View in SQL
 
 The basic syntax for creating a materialized view with SQL is:
 
@@ -340,7 +385,7 @@ CREATE MATERIALIZED VIEW basic_mv
 AS SELECT * FROM samples.nyctaxi.trips;
 ```
 
-### Creating a Temporary View with SQL
+### Creating a Temporary View in SQL
 
 The basic syntax for creating a temporary view with SQL is:
 
@@ -349,7 +394,7 @@ CREATE TEMPORARY VIEW basic_tv
 AS SELECT * FROM samples.nyctaxi.trips;
 ```
 
-### Creating a Streaming Table with SQL
+### Creating a Streaming Table in SQL
 
 When creating a streaming table, use the `STREAM` keyword to indicate 
streaming semantics for the source:
 
@@ -358,7 +403,7 @@ CREATE STREAMING TABLE basic_st
 AS SELECT * FROM STREAM samples.nyctaxi.trips;
 ```
 
-### Querying Tables Defined in Your Pipeline
+### Querying Tables Defined in a Pipeline in SQL
 
 You can reference other tables defined in your pipeline:
 
@@ -385,7 +430,7 @@ FROM customer_orders
 GROUP BY state, order_date;
 ```
 
-### Using Multiple Flows to Write to a Single Target
+### Using Multiple Flows to Write to a Single Target in SQL
 
 You can create multiple flows that append data to the same target:
 
@@ -393,13 +438,13 @@ You can create multiple flows that append data to the 
same target:
 -- create a streaming table
 CREATE STREAMING TABLE customers_us;
 
--- add the first append flow
-CREATE FLOW append1
+-- define the first append flow
+CREATE FLOW append_customers_us_west
 AS INSERT INTO customers_us
 SELECT * FROM STREAM(customers_us_west);
 
--- add the second append flow
-CREATE FLOW append2
+-- define the second append flow
+CREATE FLOW append_customers_us_east
 AS INSERT INTO customers_us
 SELECT * FROM STREAM(customers_us_east);
 ```
@@ -409,12 +454,15 @@ SELECT * FROM STREAM(customers_us_east);
 ### Python Considerations
 
 - SDP evaluates the code that defines a pipeline multiple times during 
planning and pipeline runs. Python functions that define datasets should 
include only the code required to define the table or view.
-- The function used to define a dataset must return a Spark DataFrame.
+- The function used to define a dataset must return a `pyspark.sql.DataFrame`.
 - Never use methods that save or write to files or tables as part of your SDP 
dataset code.
+- When using the `for` loop pattern to define datasets in Python, ensure that 
the list of values passed to the `for` loop is always additive.
+
+Examples of Spark SQL operations that should never be used in SDP code:
 
-Examples of Apache Spark operations that should never be used in SDP code:
 - `collect()`
 - `count()`
+- `pivot()`
 - `toPandas()`
 - `save()`
 - `saveAsTable()`
@@ -424,4 +472,3 @@ Examples of Apache Spark operations that should never be 
used in SDP code:
 ### SQL Considerations
 
 - The `PIVOT` clause is not supported in SDP SQL.
-- When using the `for` loop pattern to define datasets in Python, ensure that 
the list of values passed to the `for` loop is always additive.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to