This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a9ed01f  [SPARK-26412][PYTHON][FOLLOW-UP] Improve error messages in 
Scala iterator pandas UDF
a9ed01f is described below

commit a9ed01f05772c31362e56df59f396b824acbe27b
Author: HyukjinKwon <[email protected]>
AuthorDate: Thu Apr 9 13:14:41 2020 +0900

    [SPARK-26412][PYTHON][FOLLOW-UP] Improve error messages in Scala iterator 
pandas UDF
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to improve the error message from Scalar iterator pandas 
UDF.
    
    ### Why are the changes needed?
    
    To show the correct error messages.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, but only in unreleased branches.
    
    ```python
    import pandas as pd
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    pandas_udf('long', PandasUDFType.SCALAR_ITER)
    def pandas_plus_one(iterator):
          for _ in iterator:
                yield pd.Series(1)
    
    spark.range(10).repartition(1).select(pandas_plus_one("id")).show()
    ```
    ```python
    import pandas as pd
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    pandas_udf('long', PandasUDFType.SCALAR_ITER)
    def pandas_plus_one(iterator):
          for _ in iterator:
                yield pd.Series(list(range(20)))
    
    spark.range(10).repartition(1).select(pandas_plus_one("id")).show()
    ```
    
    **Before:**
    
    ```
    RuntimeError: The number of output rows of pandas iterator UDF should
    be the same with input rows. The input rows number is 10 but the output
    rows number is 1.
    ```
    ```
    AssertionError: Pandas MAP_ITER UDF outputted more rows than input rows.
    ```
    
    **After:**
    
    ```
    RuntimeError: The length of output in Scalar iterator pandas UDF should be
    the same with the input's; however, the length of output was 1 and the 
length
    of input was 10.
    ```
    ```
    AssertionError: Pandas SCALAR_ITER UDF outputted more rows than input rows.
    ```
    
    ### How was this patch tested?
    
    Unittests were fixed accordingly.
    
    Closes #28135 from HyukjinKwon/SPARK-26412-followup.
    
    Authored-by: HyukjinKwon <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 4fafdcd63b8c4961d7788c13047636ac3d26bfb9)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 python/pyspark/sql/tests/test_pandas_udf_scalar.py |  6 +++---
 python/pyspark/worker.py                           | 16 ++++++++--------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index b07de3c..7260e80 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -445,8 +445,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     Exception,
-                    "The number of output rows of pandas iterator UDF should 
be "
-                    "the same with input rows"):
+                    "The length of output in Scalar iterator.*"
+                    "the length of output was 1"):
                 df.select(iter_udf_wong_output_size(col('id'))).collect()
 
         @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
@@ -461,7 +461,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             with QuietTest(self.sc):
                 with self.assertRaisesRegexp(
                         Exception,
-                        "SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input 
iterator"):
+                        "pandas iterator UDF should exhaust"):
                     
df1.select(iter_udf_not_reading_all_input(col('id'))).collect()
 
     def test_vectorized_udf_chained(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 7d62bf1..10764728 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -341,7 +341,7 @@ def read_udfs(pickleSer, infile, eval_type):
             pickleSer, infile, eval_type, runner_conf, udf_index=0)
 
         def func(_, iterator):
-            num_input_rows = [0]
+            num_input_rows = [0]  # TODO(SPARK-29909): Use nonlocal after we 
drop Python 2.
 
             def map_batch(batch):
                 udf_args = [batch[offset] for offset in arg_offsets]
@@ -358,7 +358,7 @@ def read_udfs(pickleSer, infile, eval_type):
             for result_batch, result_type in result_iter:
                 num_output_rows += len(result_batch)
                 assert is_map_iter or num_output_rows <= num_input_rows[0], \
-                    "Pandas MAP_ITER UDF outputted more rows than input rows."
+                    "Pandas SCALAR_ITER UDF outputted more rows than input 
rows."
                 yield (result_batch, result_type)
 
             if is_scalar_iter:
@@ -367,14 +367,14 @@ def read_udfs(pickleSer, infile, eval_type):
                 except StopIteration:
                     pass
                 else:
-                    raise RuntimeError("SQL_SCALAR_PANDAS_ITER_UDF should 
exhaust the input "
+                    raise RuntimeError("pandas iterator UDF should exhaust the 
input "
                                        "iterator.")
 
-            if is_scalar_iter and num_output_rows != num_input_rows[0]:
-                raise RuntimeError("The number of output rows of pandas 
iterator UDF should be "
-                                   "the same with input rows. The input rows 
number is %d but the "
-                                   "output rows number is %d." %
-                                   (num_input_rows[0], num_output_rows))
+                if num_output_rows != num_input_rows[0]:
+                    raise RuntimeError(
+                        "The length of output in Scalar iterator pandas UDF 
should be "
+                        "the same with the input's; however, the length of 
output was %d and the "
+                        "length of input was %d." % (num_output_rows, 
num_input_rows[0]))
 
         # profiling is not supported for UDF
         return func, None, ser, ser


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to