This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8299600 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for
SCALAR_ITER Pandas UDF
8299600 is described below
commit 8299600575024ca81127b7bf8ef48ae11fdd0594
Author: Xiangrui Meng <[email protected]>
AuthorDate: Fri Jun 28 15:09:57 2019 -0700
[SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas
UDF
## What changes were proposed in this pull request?
Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned
that per-partition execution is an implementation detail, not guaranteed. I
will submit another PR to add the same to user guide, just to keep this PR
minimal.
I didn't add "doctest: +SKIP" in the first commit so it is easy to test
locally.
cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123


## How was this patch tested?
doctest
Closes #25005 from mengxr/SPARK-28056.2.
Authored-by: Xiangrui Meng <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
---
python/pyspark/sql/functions.py | 104 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 102 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 34f6593..5d1e69e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2951,7 +2951,107 @@ def pandas_udf(f=None, returnType=None,
functionType=None):
Therefore, this can be used, for example, to ensure the length of
each returned
`pandas.Series`, and can not be used as the column length.
- 2. GROUPED_MAP
+ 2. SCALAR_ITER
+
+ A scalar iterator UDF is semantically the same as the scalar Pandas UDF
above except that the
+ wrapped Python function takes an iterator of batches as input instead
of a single batch and,
+ instead of returning a single output batch, it yields output batches or
explicitly returns an
+ generator or an iterator of output batches.
+ It is useful when the UDF execution requires initializing some state,
e.g., loading a machine
+ learning model file to apply inference to every input batch.
+
+ .. note:: It is not guaranteed that one invocation of a scalar iterator
UDF will process all
+ batches from one partition, although it is currently implemented
this way.
+ Your code shall not rely on this behavior because it might change
in the future for
+ further optimization, e.g., one invocation processes multiple
partitions.
+
+ Scalar iterator UDFs are used with
:meth:`pyspark.sql.DataFrame.withColumn` and
+ :meth:`pyspark.sql.DataFrame.select`.
+
+ >>> import pandas as pd # doctest: +SKIP
+ >>> from pyspark.sql.functions import col, pandas_udf, struct,
PandasUDFType
+ >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"]) # doctest: +SKIP
+ >>> df = spark.createDataFrame(pdf) # doctest: +SKIP
+
+ When the UDF is called with a single column that is not `StructType`,
the input to the
+ underlying function is an iterator of `pd.Series`.
+
+ >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
+ ... def plus_one(batch_iter):
+ ... for x in batch_iter:
+ ... yield x + 1
+ ...
+ >>> df.select(plus_one(col("x"))).show() # doctest: +SKIP
+ +-----------+
+ |plus_one(x)|
+ +-----------+
+ | 2|
+ | 3|
+ | 4|
+ +-----------+
+
+ When the UDF is called with more than one columns, the input to the
underlying function is an
+ iterator of `pd.Series` tuple.
+
+ >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
+ ... def multiply_two_cols(batch_iter):
+ ... for a, b in batch_iter:
+ ... yield a * b
+ ...
+ >>> df.select(multiply_two_cols(col("x"), col("x"))).show() # doctest:
+SKIP
+ +-----------------------+
+ |multiply_two_cols(x, x)|
+ +-----------------------+
+ | 1|
+ | 4|
+ | 9|
+ +-----------------------+
+
+ When the UDF is called with a single column that is `StructType`, the
input to the underlying
+ function is an iterator of `pd.DataFrame`.
+
+ >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
+ ... def multiply_two_nested_cols(pdf_iter):
+ ... for pdf in pdf_iter:
+ ... yield pdf["a"] * pdf["b"]
+ ...
+ >>> df.select(
+ ... multiply_two_nested_cols(
+ ... struct(col("x").alias("a"), col("x").alias("b"))
+ ... ).alias("y")
+ ... ).show() # doctest: +SKIP
+ +---+
+ | y|
+ +---+
+ | 1|
+ | 4|
+ | 9|
+ +---+
+
+ In the UDF, you can initialize some states before processing batches,
wrap your code with
+ `try ... finally ...` or use context managers to ensure the release of
resources at the end
+ or in case of early termination.
+
+ >>> y_bc = spark.sparkContext.broadcast(1) # doctest: +SKIP
+ >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
+ ... def plus_y(batch_iter):
+ ... y = y_bc.value # initialize some state
+ ... try:
+ ... for x in batch_iter:
+ ... yield x + y
+ ... finally:
+ ... pass # release resources here, if any
+ ...
+ >>> df.select(plus_y(col("x"))).show() # doctest: +SKIP
+ +---------+
+ |plus_y(x)|
+ +---------+
+ | 2|
+ | 3|
+ | 4|
+ +---------+
+
+ 3. GROUPED_MAP
A grouped map UDF defines transformation: A `pandas.DataFrame` -> A
`pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of
the returned
@@ -3030,7 +3130,7 @@ def pandas_udf(f=None, returnType=None,
functionType=None):
.. seealso:: :meth:`pyspark.sql.GroupedData.apply`
- 3. GROUPED_AGG
+ 4. GROUPED_AGG
A grouped aggregate UDF defines a transformation: One or more
`pandas.Series` -> A scalar
The `returnType` should be a primitive data type, e.g.,
:class:`DoubleType`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]