This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d970c56e7d3 [SPARK-40264][ML][DOCS] Supplement docstring in
pyspark.ml.functions.predict_batch_udf
d970c56e7d3 is described below
commit d970c56e7d3971c657e37d8267abf5b77c34c8ba
Author: Lee Yang <[email protected]>
AuthorDate: Sat Jan 21 16:13:48 2023 +0900
[SPARK-40264][ML][DOCS] Supplement docstring in
pyspark.ml.functions.predict_batch_udf
### What changes were proposed in this pull request?
Followup edits to pydoc from #37734 per
[request](https://github.com/apache/spark/pull/37734#issuecomment-1384092385)
### Why are the changes needed?
Adds examples of multiple output models (using StructType return_type) for:
- returning multiple fields of array type
- returning a list of dict for row-oriented
### Does this PR introduce _any_ user-facing change?
Yes, updated pydocs for `predict_batch_udf` function.
### How was this patch tested?
Documentation only.
Closes #39628 from leewyang/SPARK-40264-2.
Authored-by: Lee Yang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/docs/source/reference/pyspark.ml.rst | 1 +
python/pyspark/ml/functions.py | 675 +++++++++++++++-------------
2 files changed, 375 insertions(+), 301 deletions(-)
diff --git a/python/docs/source/reference/pyspark.ml.rst
b/python/docs/source/reference/pyspark.ml.rst
index b9b855649c7..6c807514687 100644
--- a/python/docs/source/reference/pyspark.ml.rst
+++ b/python/docs/source/reference/pyspark.ml.rst
@@ -196,6 +196,7 @@ Functions
array_to_vector
vector_to_array
+ predict_batch_udf
Vector and Matrix
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index 53ad871d3c7..1ed2c294356 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -355,289 +355,28 @@ def predict_batch_udf(
batch_size: int,
input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] |
None = None,
) -> UserDefinedFunctionLike:
- """Given a function which loads a model and makes batch predictions,
returns a Pandas UDF for
- inferencing over that model.
-
- The returned UDF does the following on each DataFrame partition:
- - calls `make_predict_fn` to load the model and cache its `predict_fn`.
- - batches the input records as numpy arrays and invokes `predict_fn` on
each batch.
-
- This assumes that the `make_predict_fn` encapsulates all of the necessary
dependencies for
- running the model or the Spark executor environment already satisfies all
runtime requirements.
-
- For the conversion of Spark DataFrame to numpy arrays, there is a
one-to-one mapping between the
- input arguments of the `predict_fn` (returned by the `make_predict_fn`)
and the input columns to
- the UDF (returned by the `predict_batch_udf`) at runtime. Each input
column will be converted
- as follows:
- - scalar column -> np.ndarray
- - tensor column + tensor shape -> np.ndarray
-
- Note that tensor columns in the Spark DataFrame must be represented as a
flattened 1-D array,
- and multiple scalar columns can be combined into a single tensor column
using the standard
- `array()` PySpark SQL function.
-
- Example (tensor column):
-
- Input DataFrame has a single column with a flattened tensor value,
represented as an array of
- float.
- ```
- from pyspark.ml.functions import predict_batch_udf
-
- def make_mnist_fn():
- # load/init happens once per python worker
- import tensorflow as tf
- model = tf.keras.models.load_model('/path/to/mnist_model')
-
- # predict on batches of tasks/partitions, using cached model
- def predict(inputs: np.ndarray) -> np.ndarray:
- # inputs.shape = [batch_size, 784]
- # outputs.shape = [batch_size, 10], return_type =
ArrayType(FloatType())
- return model.predict(inputs)
-
- return predict
-
- mnist_udf = predict_batch_udf(make_mnist_fn,
- return_type=ArrayType(FloatType()),
- batch_size=100,
- input_tensor_shapes=[[784]])
-
- df = spark.read.parquet("/path/to/mnist_data")
- df.show(5)
- # +--------------------+
- # | data|
- # +--------------------+
- # |[0.0, 0.0, 0.0, 0...|
- # |[0.0, 0.0, 0.0, 0...|
- # |[0.0, 0.0, 0.0, 0...|
- # |[0.0, 0.0, 0.0, 0...|
- # |[0.0, 0.0, 0.0, 0...|
- # +--------------------+
-
- df.withColumn("preds", mnist_udf("data")).show(5)
- # +--------------------+--------------------+
- # | data| preds|
- # +--------------------+--------------------+
- # |[0.0, 0.0, 0.0, 0...|[-13.511008, 8.84...|
- # |[0.0, 0.0, 0.0, 0...|[-5.3957458, -2.2...|
- # |[0.0, 0.0, 0.0, 0...|[-7.2014456, -8.8...|
- # |[0.0, 0.0, 0.0, 0...|[-19.466187, -13....|
- # |[0.0, 0.0, 0.0, 0...|[-5.7757926, -7.8...|
- # +--------------------+--------------------+
- ```
-
- Example (scalar column):
-
- Input DataFrame has a single scalar column, which will be passed to the
`predict` function as
- a 1-D numpy array.
- ```
- import numpy as np
- import pandas as pd
- from pyspark.ml.functions import predict_batch_udf
- from pyspark.sql.types import FloatType
-
- df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
- df.show(5)
- # +---+
- # | 0|
- # +---+
- # | 0|
- # | 1|
- # | 2|
- # | 3|
- # | 4|
- # +---+
-
- def make_times_two_fn():
- def predict(inputs: np.ndarray) -> np.ndarray:
- # inputs.shape = [batch_size]
- # outputs.shape = [batch_size], return_type = FloatType()
- return inputs * 2
-
- return predict
-
- times_two_udf = predict_batch_udf(make_times_two_fn,
- return_type=FloatType(),
- batch_size=10)
-
- df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
- df.withColumn("x2", times_two_udf("0")).show(5)
- # +---+---+
- # | 0| x2|
- # +---+---+
- # | 0|0.0|
- # | 1|2.0|
- # | 2|4.0|
- # | 3|6.0|
- # | 4|8.0|
- # +---+---+
- ```
-
- Example (multiple scalar columns):
-
- Input DataFrame has muliple columns of scalar values. If the
user-provided `predict` function
- expects a single input, then the user must combine the multiple columns
into a single tensor
- using `pyspark.sql.functions.array`.
- ```
- import numpy as np
- import pandas as pd
- from pyspark.ml.functions import predict_batch_udf
- from pyspark.sql.functions import array
-
- data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
- pdf = pd.DataFrame(data, columns=['a','b','c','d'])
- df = spark.createDataFrame(pdf)
- # +----+----+----+----+
- # | a| b| c| d|
- # +----+----+----+----+
- # | 0.0| 1.0| 2.0| 3.0|
- # | 4.0| 5.0| 6.0| 7.0|
- # | 8.0| 9.0|10.0|11.0|
- # |12.0|13.0|14.0|15.0|
- # |16.0|17.0|18.0|19.0|
- # +----+----+----+----+
-
- def make_sum_fn():
- def predict(inputs: np.ndarray) -> np.ndarray:
- # inputs.shape = [batch_size, 4]
- # outputs.shape = [batch_size], return_type = FloatType()
- return np.sum(inputs, axis=1)
-
- return predict
-
- sum_udf = predict_batch_udf(make_sum_fn,
- return_type=FloatType(),
- batch_size=10,
- input_tensor_shapes=[[4]])
-
- df.withColumn("sum", sum_udf(array("a", "b", "c", "d"))).show(5)
- # +----+----+----+----+----+
- # | a| b| c| d| sum|
- # +----+----+----+----+----+
- # | 0.0| 1.0| 2.0| 3.0| 6.0|
- # | 4.0| 5.0| 6.0| 7.0|22.0|
- # | 8.0| 9.0|10.0|11.0|38.0|
- # |12.0|13.0|14.0|15.0|54.0|
- # |16.0|17.0|18.0|19.0|70.0|
- # +----+----+----+----+----+
- ```
-
- If the `predict` function expects multiple inputs, then the number of
selected input columns
- must match the number of expected inputs.
- ```
- def make_sum_fn():
- def predict(x1: np.ndarray, x2: np.ndarray, x3: np.ndarray, x4:
np.ndarray) -> np.ndarray:
- # xN.shape = [batch_size]
- # outputs.shape = [batch_size], return_type = FloatType()
- return x1 + x2 + x3 + x4
-
- return predict
-
- sum_udf = predict_batch_udf(make_sum_fn,
- return_type=FloatType(),
- batch_size=10)
-
- df.withColumn("sum", sum_udf("a", "b", "c", "d")).show(5)
- # +----+----+----+----+----+
- # | a| b| c| d| sum|
- # +----+----+----+----+----+
- # | 0.0| 1.0| 2.0| 3.0| 6.0|
- # | 4.0| 5.0| 6.0| 7.0|22.0|
- # | 8.0| 9.0|10.0|11.0|38.0|
- # |12.0|13.0|14.0|15.0|54.0|
- # |16.0|17.0|18.0|19.0|70.0|
- # +----+----+----+----+----+
- ```
-
- Example (multiple tensor columns):
-
- Input DataFrame has multiple columns, where each column is a tensor. The
number of columns
- should match the number of expected inputs for the user-provided `predict`
function.
- ```
- import numpy as np
- import pandas as pd
- from pyspark.ml.functions import predict_batch_udf
- from pyspark.sql.types import FloatType, StructType, StructField
- from typing import Mapping
-
- data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
- pdf = pd.DataFrame(data, columns=['a','b','c','d'])
- pdf_tensor = pd.DataFrame()
- pdf_tensor['t1'] = pdf.values.tolist()
- pdf_tensor['t2'] = pdf.drop(columns='d').values.tolist()
- df = spark.createDataFrame(pdf_tensor)
- df.show(5)
- # +--------------------+------------------+
- # | t1| t2|
- # +--------------------+------------------+
- # |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]|
- # |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|
- # |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|
- # |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|
- # |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|
- # +--------------------+------------------+
-
- def make_multi_sum_fn():
- def predict(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
- # x1.shape = [batch_size, 4]
- # x2.shape = [batch_size, 3]
- # outputs.shape = [batch_size], result_type = FloatType()
- return np.sum(x1, axis=1) + np.sum(x2, axis=1)
-
- return predict
-
- multi_sum_udf = predict_batch_udf(
- make_multi_sum_fn,
- return_type=FloatType(),
- batch_size=5,
- input_tensor_shapes=[[4], [3]],
- )
+ """Given a function which loads a model and returns a `predict` function
for inference over a
+ batch of numpy inputs, returns a Pandas UDF wrapper for inference over a
Spark DataFrame.
- df.withColumn("sum", multi_sum_udf("t1", "t2")).show(5)
- # +--------------------+------------------+-----+
- # | t1| t2| sum|
- # +--------------------+------------------+-----+
- # |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 9.0|
- # |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]| 37.0|
- # |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]| 65.0|
- # |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]| 93.0|
- # |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|121.0|
- # +--------------------+------------------+-----+
-
- # Note that some models can provide multiple outputs. These can be
returned as a dictionary
- # of named values, which can be represented in columnar (or row-based)
formats.
-
- def make_multi_sum_fn():
- def predict_columnar(x1: np.ndarray, x2: np.ndarray) -> Mapping[str,
np.ndarray]:
- # x1.shape = [batch_size, 4]
- # x2.shape = [batch_size, 3]
- return {
- "sum1": np.sum(x1, axis=1),
- "sum2": np.sum(x2, axis=1)
- } # return_type = StructType()
-
- return predict_columnar
-
- multi_sum_udf = predict_batch_udf(
- make_multi_sum_fn,
- return_type=StructType([
- StructField("sum1", FloatType(), True),
- StructField("sum2", FloatType(), True)
- ])
- batch_size=5,
- input_tensor_shapes=[[4], [3]],
- )
+ The returned Pandas UDF does the following on each DataFrame partition:
+
+ * calls the `make_predict_fn` to load the model and cache its `predict`
function.
+ * batches the input records as numpy arrays and invokes `predict` on each
batch.
+
+ Note: this assumes that the `make_predict_fn` encapsulates all of the
necessary dependencies for
+ running the model, or the Spark executor environment already satisfies all
runtime requirements.
- df.withColumn("preds", multi_sum_udf("t1", "t2")).select("t1", "t2",
"preds.*").show(5)
- # +--------------------+------------------+----+----+
- # | t1| t2|sum1|sum2|
- # +--------------------+------------------+----+----+
- # |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 6.0| 3.0|
- # |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|22.0|15.0|
- # |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|38.0|27.0|
- # |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|54.0|39.0|
- # |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|70.0|51.0|
- # +--------------------+------------------+----+----+
- ```
+ For the conversion of the Spark DataFrame to numpy arrays, there is a
one-to-one mapping between
+ the input arguments of the `predict` function (returned by the
`make_predict_fn`) and the input
+ columns sent to the Pandas UDF (returned by the `predict_batch_udf`) at
runtime. Each input
+ column will be converted as follows:
+
+ * scalar column -> 1-dim np.ndarray
+ * tensor column + tensor shape -> N-dim np.ndarray
+
+ Note that any tensor columns in the Spark DataFrame must be represented as
a flattened
+ one-dimensional array, and multiple scalar columns can be combined into a
single tensor column
+ using the standard :py:func:`pyspark.sql.functions.array()` function.
.. versionadded:: 3.4.0
@@ -647,37 +386,371 @@ def predict_batch_udf(
Function which is responsible for loading a model and returning a
:py:class:`PredictBatchFunction` which takes one or more numpy arrays
as input and returns
one of the following:
- - a numpy array (for a single output)
- - a dictionary of named numpy arrays (for multiple outputs)
- - a row-oriented list of dictionaries (for multiple outputs).
+
+ * a numpy array (for a single output)
+ * a dictionary of named numpy arrays (for multiple outputs)
+ * a row-oriented list of dictionaries (for multiple outputs).
+
For a dictionary of named numpy arrays, the arrays can only be one or
two dimensional, since
- higher dimension arrays are not supported. For a row-oriented list of
dictionaries, each
+ higher dimensional arrays are not supported. For a row-oriented list
of dictionaries, each
element in the dictionary must be either a scalar or one-dimensional
array.
- return_type : :class:`pspark.sql.types.DataType` or str.
+ return_type : :py:class:`pyspark.sql.types.DataType` or str.
Spark SQL datatype for the expected output:
- - Scalar (e.g. IntegerType, FloatType) --> 1-dim numpy array.
- - ArrayType --> 2-dim numpy array.
- - StructType --> dict with keys matching struct fields.
- - StructType --> list of dict with keys matching struct fields, for
models like the
- [Huggingface pipeline for sentiment
analysis](https://huggingface.co/docs/transformers/quicktour#pipeline-usage] #
noqa: E501
+
+ * Scalar (e.g. IntegerType, FloatType) --> 1-dim numpy array.
+ * ArrayType --> 2-dim numpy array.
+ * StructType --> dict with keys matching struct fields.
+ * StructType --> list of dict with keys matching struct fields, for
models like the
+ `Huggingface pipeline for sentiment analysis
+
<https://huggingface.co/docs/transformers/quicktour#pipeline-usage>`_.
+
batch_size : int
- Batch size to use for inference, note that this is typically a
limitation of the model
- and/or the hardware resources and is usually smaller than the Spark
partition size.
- input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] |
None
- Optional input tensor shapes for models with tensor inputs. This can
be a list of shapes,
+ Batch size to use for inference. This is typically a limitation of
the model
+ and/or available hardware resources and is usually smaller than the
Spark partition size.
+ input_tensor_shapes : List[List[int] | None] | Mapping[int, List[int]] |
None, optional
+ Input tensor shapes for models with tensor inputs. This can be a list
of shapes,
where each shape is a list of integers or None (for scalar inputs).
Alternatively, this
can be represented by a "sparse" dictionary, where the keys are the
integer indices of the
inputs, and the values are the shapes. Each tensor input value in the
Spark DataFrame must
be represented as a single column containing a flattened 1-D array.
The provided
- input_tensor_shapes will be used to reshape the flattened array into
expected tensor shape.
- For the list form, the order of the tensor shapes must match the order
of the selected
- DataFrame columns. The batch dimension (typically -1 or None in the
first dimension) should
- not be included, since it will be determined by the batch_size
argument. Tabular datasets
- with scalar-valued columns should not provide this argument.
+ `input_tensor_shapes` will be used to reshape the flattened array into
the expected tensor
+ shape. For the list form, the order of the tensor shapes must match
the order of the
+ selected DataFrame columns. The batch dimension (typically -1 or None
in the first
+ dimension) should not be included, since it will be determined by the
batch_size argument.
+ Tabular datasets with scalar-valued columns should not provide this
argument.
Returns
-------
- A pandas_udf for predicting a batch.
+ :py:class:`UserDefinedFunctionLike`
+ A Pandas UDF for model inference on a Spark DataFrame.
+
+ Examples
+ --------
+ For a pre-trained TensorFlow MNIST model with two-dimensional input images
represented as a
+ flattened tensor value stored in a single Spark DataFrame column of type
`array<float>`.
+
+ .. code-block:: python
+
+ from pyspark.ml.functions import predict_batch_udf
+
+ def make_mnist_fn():
+ # load/init happens once per python worker
+ import tensorflow as tf
+ model = tf.keras.models.load_model('/path/to/mnist_model')
+
+ # predict on batches of tasks/partitions, using cached model
+ def predict(inputs: np.ndarray) -> np.ndarray:
+ # inputs.shape = [batch_size, 784], see input_tensor_shapes
+ # outputs.shape = [batch_size, 10], see return_type
+ return model.predict(inputs)
+
+ return predict
+
+ mnist_udf = predict_batch_udf(make_mnist_fn,
+ return_type=ArrayType(FloatType()),
+ batch_size=100,
+ input_tensor_shapes=[[784]])
+
+ df = spark.read.parquet("/path/to/mnist_data")
+ df.show(5)
+ # +--------------------+
+ # | data|
+ # +--------------------+
+ # |[0.0, 0.0, 0.0, 0...|
+ # |[0.0, 0.0, 0.0, 0...|
+ # |[0.0, 0.0, 0.0, 0...|
+ # |[0.0, 0.0, 0.0, 0...|
+ # |[0.0, 0.0, 0.0, 0...|
+ # +--------------------+
+
+ df.withColumn("preds", mnist_udf("data")).show(5)
+ # +--------------------+--------------------+
+ # | data| preds|
+ # +--------------------+--------------------+
+ # |[0.0, 0.0, 0.0, 0...|[-13.511008, 8.84...|
+ # |[0.0, 0.0, 0.0, 0...|[-5.3957458, -2.2...|
+ # |[0.0, 0.0, 0.0, 0...|[-7.2014456, -8.8...|
+ # |[0.0, 0.0, 0.0, 0...|[-19.466187, -13....|
+ # |[0.0, 0.0, 0.0, 0...|[-5.7757926, -7.8...|
+ # +--------------------+--------------------+
+
+ To demonstrate usage with different combinations of input and output
types, the following
+ examples just use simple mathematical transforms as the models.
+
+ * Single scalar column
+ Input DataFrame has a single scalar column, which will be passed to
the `predict`
+ function as a 1-D numpy array.
+
+ >>> import numpy as np
+ >>> import pandas as pd
+ >>> from pyspark.ml.functions import predict_batch_udf
+ >>> from pyspark.sql.types import FloatType
+ >>>
+ >>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
+ >>> df.show(5)
+ +---+
+ | 0|
+ +---+
+ | 0|
+ | 1|
+ | 2|
+ | 3|
+ | 4|
+ +---+
+ only showing top 5 rows
+
+ >>> def make_times_two_fn():
+ ... def predict(inputs: np.ndarray) -> np.ndarray:
+ ... # inputs.shape = [batch_size]
+ ... # outputs.shape = [batch_size]
+ ... return inputs * 2
+ ... return predict
+ >>>
+ >>> times_two_udf = predict_batch_udf(make_times_two_fn,
+ ... return_type=FloatType(),
+ ... batch_size=10)
+ >>>
+ >>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
+ >>> df.withColumn("x2", times_two_udf("0")).show(5)
+ +---+---+
+ | 0| x2|
+ +---+---+
+ | 0|0.0|
+ | 1|2.0|
+ | 2|4.0|
+ | 3|6.0|
+ | 4|8.0|
+ +---+---+
+ only showing top 5 rows
+
+ * Multiple scalar columns
+ Input DataFrame has muliple columns of scalar values. If the
user-provided `predict`
+ function expects a single input, then the user must combine the
multiple columns into a
+ single tensor using `pyspark.sql.functions.array`.
+
+ >>> import numpy as np
+ >>> import pandas as pd
+ >>> from pyspark.ml.functions import predict_batch_udf
+ >>> from pyspark.sql.functions import array
+ >>>
+ >>> data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
+ >>> pdf = pd.DataFrame(data, columns=['a','b','c','d'])
+ >>> df = spark.createDataFrame(pdf)
+ >>> df.show(5)
+ +----+----+----+----+
+ | a| b| c| d|
+ +----+----+----+----+
+ | 0.0| 1.0| 2.0| 3.0|
+ | 4.0| 5.0| 6.0| 7.0|
+ | 8.0| 9.0|10.0|11.0|
+ |12.0|13.0|14.0|15.0|
+ |16.0|17.0|18.0|19.0|
+ +----+----+----+----+
+ only showing top 5 rows
+
+ >>> def make_sum_fn():
+ ... def predict(inputs: np.ndarray) -> np.ndarray:
+ ... # inputs.shape = [batch_size, 4]
+ ... # outputs.shape = [batch_size]
+ ... return np.sum(inputs, axis=1)
+ ... return predict
+ >>>
+ >>> sum_udf = predict_batch_udf(make_sum_fn,
+ ... return_type=FloatType(),
+ ... batch_size=10,
+ ... input_tensor_shapes=[[4]])
+ >>>
+ >>> df.withColumn("sum", sum_udf(array("a", "b", "c", "d"))).show(5)
+ +----+----+----+----+----+
+ | a| b| c| d| sum|
+ +----+----+----+----+----+
+ | 0.0| 1.0| 2.0| 3.0| 6.0|
+ | 4.0| 5.0| 6.0| 7.0|22.0|
+ | 8.0| 9.0|10.0|11.0|38.0|
+ |12.0|13.0|14.0|15.0|54.0|
+ |16.0|17.0|18.0|19.0|70.0|
+ +----+----+----+----+----+
+ only showing top 5 rows
+
+ If the `predict` function expects multiple inputs, then the number of
selected input columns
+ must match the number of expected inputs.
+
+ >>> def make_sum_fn():
+ ... def predict(x1: np.ndarray,
+ ... x2: np.ndarray,
+ ... x3: np.ndarray,
+ ... x4: np.ndarray) -> np.ndarray:
+ ... # xN.shape = [batch_size]
+ ... # outputs.shape = [batch_size]
+ ... return x1 + x2 + x3 + x4
+ ... return predict
+ >>>
+ >>> sum_udf = predict_batch_udf(make_sum_fn,
+ ... return_type=FloatType(),
+ ... batch_size=10)
+ >>>
+ >>> df.withColumn("sum", sum_udf("a", "b", "c", "d")).show(5)
+ +----+----+----+----+----+
+ | a| b| c| d| sum|
+ +----+----+----+----+----+
+ | 0.0| 1.0| 2.0| 3.0| 6.0|
+ | 4.0| 5.0| 6.0| 7.0|22.0|
+ | 8.0| 9.0|10.0|11.0|38.0|
+ |12.0|13.0|14.0|15.0|54.0|
+ |16.0|17.0|18.0|19.0|70.0|
+ +----+----+----+----+----+
+ only showing top 5 rows
+
+ * Multiple tensor columns
+ Input DataFrame has multiple columns, where each column is a tensor.
The number of columns
+ should match the number of expected inputs for the user-provided
`predict` function.
+
+ >>> import numpy as np
+ >>> import pandas as pd
+ >>> from pyspark.ml.functions import predict_batch_udf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> from typing import Mapping
+ >>>
+ >>> data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
+ >>> pdf = pd.DataFrame(data, columns=['a','b','c','d'])
+ >>> pdf_tensor = pd.DataFrame()
+ >>> pdf_tensor['t1'] = pdf.values.tolist()
+ >>> pdf_tensor['t2'] = pdf.drop(columns='d').values.tolist()
+ >>> df = spark.createDataFrame(pdf_tensor)
+ >>> df.show(5)
+ +--------------------+------------------+
+ | t1| t2|
+ +--------------------+------------------+
+ |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]|
+ |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|
+ |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|
+ |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|
+ |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|
+ +--------------------+------------------+
+ only showing top 5 rows
+
+ >>> def make_multi_sum_fn():
+ ... def predict(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
+ ... # x1.shape = [batch_size, 4]
+ ... # x2.shape = [batch_size, 3]
+ ... # outputs.shape = [batch_size]
+ ... return np.sum(x1, axis=1) + np.sum(x2, axis=1)
+ ... return predict
+ >>>
+ >>> multi_sum_udf = predict_batch_udf(
+ ... make_multi_sum_fn,
+ ... return_type=FloatType(),
+ ... batch_size=5,
+ ... input_tensor_shapes=[[4], [3]],
+ ... )
+ >>>
+ >>> df.withColumn("sum", multi_sum_udf("t1", "t2")).show(5)
+ +--------------------+------------------+-----+
+ | t1| t2| sum|
+ +--------------------+------------------+-----+
+ |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 9.0|
+ |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]| 37.0|
+ |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]| 65.0|
+ |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]| 93.0|
+ |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|121.0|
+ +--------------------+------------------+-----+
+ only showing top 5 rows
+
+ * Multiple outputs
+ Some models can provide multiple outputs. These can be returned as a
dictionary of named
+ values, which can be represented in either columnar or row-based
formats.
+
+ >>> def make_multi_sum_fn():
+ ... def predict_columnar(x1: np.ndarray, x2: np.ndarray) ->
Mapping[str, np.ndarray]:
+ ... # x1.shape = [batch_size, 4]
+ ... # x2.shape = [batch_size, 3]
+ ... return {
+ ... "sum1": np.sum(x1, axis=1),
+ ... "sum2": np.sum(x2, axis=1)
+ ... }
+ ... return predict_columnar
+ >>>
+ >>> multi_sum_udf = predict_batch_udf(
+ ... make_multi_sum_fn,
+ ... return_type=StructType([
+ ... StructField("sum1", FloatType(), True),
+ ... StructField("sum2", FloatType(), True)
+ ... ]),
+ ... batch_size=5,
+ ... input_tensor_shapes=[[4], [3]],
+ ... )
+ >>>
+ >>> df.withColumn("preds", multi_sum_udf("t1", "t2")).select("t1",
"t2", "preds.*").show(5)
+ +--------------------+------------------+----+----+
+ | t1| t2|sum1|sum2|
+ +--------------------+------------------+----+----+
+ |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 6.0| 3.0|
+ |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|22.0|15.0|
+ |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|38.0|27.0|
+ |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|54.0|39.0|
+ |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|70.0|51.0|
+ +--------------------+------------------+----+----+
+ only showing top 5 rows
+
+ >>> def make_multi_sum_fn():
+ ... def predict_row(x1: np.ndarray, x2: np.ndarray) ->
list[Mapping[str, float]]:
+ ... # x1.shape = [batch_size, 4]
+ ... # x2.shape = [batch_size, 3]
+ ... return [{'sum1': np.sum(x1[i]), 'sum2': np.sum(x2[i])} for
i in range(len(x1))]
+ ... return predict_row
+ >>>
+ >>> multi_sum_udf = predict_batch_udf(
+ ... make_multi_sum_fn,
+ ... return_type=StructType([
+ ... StructField("sum1", FloatType(), True),
+ ... StructField("sum2", FloatType(), True)
+ ... ]),
+ ... batch_size=5,
+ ... input_tensor_shapes=[[4], [3]],
+ ... )
+ >>>
+ >>> df.withColumn("sum", multi_sum_udf("t1", "t2")).select("t1", "t2",
"sum.*").show(5)
+ +--------------------+------------------+----+----+
+ | t1| t2|sum1|sum2|
+ +--------------------+------------------+----+----+
+ |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 6.0| 3.0|
+ |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|22.0|15.0|
+ |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|38.0|27.0|
+ |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|54.0|39.0|
+ |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|70.0|51.0|
+ +--------------------+------------------+----+----+
+ only showing top 5 rows
+
+ Note that the multiple outputs can be arrays as well.
+
+ >>> def make_multi_times_two_fn():
+ ... def predict(x1: np.ndarray, x2: np.ndarray) -> Mapping[str,
np.ndarray]:
+ ... # x1.shape = [batch_size, 4]
+ ... # x2.shape = [batch_size, 3]
+ ... return {"t1x2": x1 * 2, "t2x2": x2 * 2}
+ ... return predict
+ >>>
+ >>> multi_times_two_udf = predict_batch_udf(
+ ... make_multi_times_two_fn,
+ ... return_type=StructType([
+ ... StructField("t1x2", ArrayType(FloatType()), True),
+ ... StructField("t2x2", ArrayType(FloatType()), True)
+ ... ]),
+ ... batch_size=5,
+ ... input_tensor_shapes=[[4], [3]],
+ ... )
+ >>>
+ >>> df.withColumn("x2", multi_times_two_udf("t1", "t2")).select("t1",
"t2", "x2.*").show(5)
+
+--------------------+------------------+--------------------+------------------+
+ | t1| t2| t1x2|
t2x2|
+
+--------------------+------------------+--------------------+------------------+
+ |[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]|[0.0, 2.0, 4.0, 6.0]| [0.0,
2.0, 4.0]|
+ |[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|[8.0, 10.0, 12.0,...| [8.0,
10.0, 12.0]|
+ |[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|[16.0, 18.0, 20.0...|[16.0,
18.0, 20.0]|
+ |[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|[24.0, 26.0, 28.0...|[24.0,
26.0, 28.0]|
+ |[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|[32.0, 34.0, 36.0...|[32.0,
34.0, 36.0]|
+
+--------------------+------------------+--------------------+------------------+
+ only showing top 5 rows
"""
# generate a new uuid each time this is invoked on the driver to
invalidate executor-side cache.
model_uuid = uuid.uuid4()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]