This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4f71231 [SPARK-33073][PYTHON] Improve error handling on Pandas to
Arrow conversion failures
4f71231 is described below
commit 4f71231af51a3da5d7964a218a878c0cf3037c10
Author: Bryan Cutler <[email protected]>
AuthorDate: Tue Oct 6 18:11:24 2020 +0900
[SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion
failures
### What changes were proposed in this pull request?
This improves error handling when a failure in conversion from Pandas to
Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0
release.
### Why are the changes needed?
Current tests will fail with Arrow 2.0.0 because of a change in error
message when the schema is invalid. For these cases, the current error message
also includes information on disabling safe conversion config, which is mainly
meant for floating point truncation and overflow. The tests have been updated
to use a message that is show for past Arrow versions, and upcoming.
If the user enters an invalid schema, the error produced by pyarrow is not
consistent and either `TypeError` or `ArrowInvalid`, with the latter being
caught, and raised as a `RuntimeError` with the extra info.
The error handling is improved by:
- narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a
subclass and what is raised on safe conversion failures.
- The exception is only raised with additional information on disabling
"spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the
first place.
- The original exception is chained to better show it to the user.
### Does this PR introduce _any_ user-facing change?
Yes, the error re-raised changes from a RuntimeError to a ValueError, which
better categorizes this type of error and in-line with the original Arrow error.
### How was this patch tested?
Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot
Closes #29951 from
BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073.
Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 0812d6c17cc4876bb87a9d1fec35ec8c7b2365f0)
Signed-off-by: HyukjinKwon <[email protected]>
---
python/pyspark/sql/pandas/serializers.py | 17 ++++++++++-------
python/pyspark/sql/tests/test_arrow.py | 9 +++++----
python/pyspark/sql/tests/test_pandas_grouped_map.py | 15 ++++++++-------
3 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index 4dd15d1..b164a38 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -156,13 +156,16 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
s = _check_series_convert_timestamps_internal(s,
self._timezone)
try:
array = pa.Array.from_pandas(s, mask=mask, type=t,
safe=self._safecheck)
- except pa.ArrowException as e:
- error_msg = "Exception thrown when converting pandas.Series
(%s) to Arrow " + \
- "Array (%s). It can be caused by overflows or
other unsafe " + \
- "conversions warned by Arrow. Arrow safe type
check can be " + \
- "disabled by using SQL config " + \
-
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
- raise RuntimeError(error_msg % (s.dtype, t), e)
+ except ValueError as e:
+ if self._safecheck:
+ error_msg = "Exception thrown when converting
pandas.Series (%s) to " + \
+ "Arrow Array (%s). It can be caused by
overflows or other " + \
+ "unsafe conversions warned by Arrow. Arrow
safe type check " + \
+ "can be disabled by using SQL config " + \
+
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
+ raise ValueError(error_msg % (s.dtype, t)) from e
+ else:
+ raise e
return array
arrs = []
diff --git a/python/pyspark/sql/tests/test_arrow.py
b/python/pyspark/sql/tests/test_arrow.py
index 15c5cf1..2c6231d 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -266,11 +266,12 @@ class ArrowTests(ReusedSQLTestCase):
def test_createDataFrame_with_incorrect_schema(self):
pdf = self.create_pandas_data_frame()
fields = list(self.schema)
- fields[0], fields[1] = fields[1], fields[0] # swap str with int
+ fields[5], fields[6] = fields[6], fields[5] # swap decimal with date
wrong_schema = StructType(fields)
- with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, "integer.*required"):
- self.spark.createDataFrame(pdf, schema=wrong_schema)
+ with
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
+ with QuietTest(self.sc):
+ with self.assertRaisesRegexp(Exception,
"[D|d]ecimal.*got.*date"):
+ self.spark.createDataFrame(pdf, schema=wrong_schema)
def test_createDataFrame_with_names(self):
pdf = self.create_pandas_data_frame()
diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py
b/python/pyspark/sql/tests/test_pandas_grouped_map.py
index cc6167e..8e02b29 100644
--- a/python/pyspark/sql/tests/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py
@@ -450,15 +450,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase):
def column_name_typo(pdf):
return pd.DataFrame({'iid': pdf.id, 'v': pdf.v})
- @pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
+ @pandas_udf('id long, v decimal', PandasUDFType.GROUPED_MAP)
def invalid_positional_types(pdf):
- return pd.DataFrame([(u'a', 1.2)])
+ return pd.DataFrame([(1, datetime.date(2020, 10, 5))])
- with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
- grouped_df.apply(column_name_typo).collect()
- with self.assertRaisesRegexp(Exception, "an integer is required"):
- grouped_df.apply(invalid_positional_types).collect()
+ with
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
+ with QuietTest(self.sc):
+ with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
+ grouped_df.apply(column_name_typo).collect()
+ with self.assertRaisesRegexp(Exception,
"[D|d]ecimal.*got.*date"):
+ grouped_df.apply(invalid_positional_types).collect()
def test_positional_assignment_conf(self):
with self.sql_conf({
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]