This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a9ed01f [SPARK-26412][PYTHON][FOLLOW-UP] Improve error messages in
Scala iterator pandas UDF
a9ed01f is described below
commit a9ed01f05772c31362e56df59f396b824acbe27b
Author: HyukjinKwon <[email protected]>
AuthorDate: Thu Apr 9 13:14:41 2020 +0900
[SPARK-26412][PYTHON][FOLLOW-UP] Improve error messages in Scala iterator
pandas UDF
### What changes were proposed in this pull request?
This PR proposes to improve the error message from Scalar iterator pandas
UDF.
### Why are the changes needed?
To show the correct error messages.
### Does this PR introduce any user-facing change?
Yes, but only in unreleased branches.
```python
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
for _ in iterator:
yield pd.Series(1)
spark.range(10).repartition(1).select(pandas_plus_one("id")).show()
```
```python
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
for _ in iterator:
yield pd.Series(list(range(20)))
spark.range(10).repartition(1).select(pandas_plus_one("id")).show()
```
**Before:**
```
RuntimeError: The number of output rows of pandas iterator UDF should
be the same with input rows. The input rows number is 10 but the output
rows number is 1.
```
```
AssertionError: Pandas MAP_ITER UDF outputted more rows than input rows.
```
**After:**
```
RuntimeError: The length of output in Scalar iterator pandas UDF should be
the same with the input's; however, the length of output was 1 and the
length
of input was 10.
```
```
AssertionError: Pandas SCALAR_ITER UDF outputted more rows than input rows.
```
### How was this patch tested?
Unittests were fixed accordingly.
Closes #28135 from HyukjinKwon/SPARK-26412-followup.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 4fafdcd63b8c4961d7788c13047636ac3d26bfb9)
Signed-off-by: HyukjinKwon <[email protected]>
---
python/pyspark/sql/tests/test_pandas_udf_scalar.py | 6 +++---
python/pyspark/worker.py | 16 ++++++++--------
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index b07de3c..7260e80 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -445,8 +445,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
- "The number of output rows of pandas iterator UDF should
be "
- "the same with input rows"):
+ "The length of output in Scalar iterator.*"
+ "the length of output was 1"):
df.select(iter_udf_wong_output_size(col('id'))).collect()
@pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
@@ -461,7 +461,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
- "SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input
iterator"):
+ "pandas iterator UDF should exhaust"):
df1.select(iter_udf_not_reading_all_input(col('id'))).collect()
def test_vectorized_udf_chained(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 7d62bf1..10764728 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -341,7 +341,7 @@ def read_udfs(pickleSer, infile, eval_type):
pickleSer, infile, eval_type, runner_conf, udf_index=0)
def func(_, iterator):
- num_input_rows = [0]
+ num_input_rows = [0] # TODO(SPARK-29909): Use nonlocal after we
drop Python 2.
def map_batch(batch):
udf_args = [batch[offset] for offset in arg_offsets]
@@ -358,7 +358,7 @@ def read_udfs(pickleSer, infile, eval_type):
for result_batch, result_type in result_iter:
num_output_rows += len(result_batch)
assert is_map_iter or num_output_rows <= num_input_rows[0], \
- "Pandas MAP_ITER UDF outputted more rows than input rows."
+ "Pandas SCALAR_ITER UDF outputted more rows than input
rows."
yield (result_batch, result_type)
if is_scalar_iter:
@@ -367,14 +367,14 @@ def read_udfs(pickleSer, infile, eval_type):
except StopIteration:
pass
else:
- raise RuntimeError("SQL_SCALAR_PANDAS_ITER_UDF should
exhaust the input "
+ raise RuntimeError("pandas iterator UDF should exhaust the
input "
"iterator.")
- if is_scalar_iter and num_output_rows != num_input_rows[0]:
- raise RuntimeError("The number of output rows of pandas
iterator UDF should be "
- "the same with input rows. The input rows
number is %d but the "
- "output rows number is %d." %
- (num_input_rows[0], num_output_rows))
+ if num_output_rows != num_input_rows[0]:
+ raise RuntimeError(
+ "The length of output in Scalar iterator pandas UDF
should be "
+ "the same with the input's; however, the length of
output was %d and the "
+ "length of input was %d." % (num_output_rows,
num_input_rows[0]))
# profiling is not supported for UDF
return func, None, ser, ser
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]