This is an automated email from the ASF dual-hosted git repository.
zhengruifeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 31fe6dd44626 [SPARK-56929][PYTHON] Pass prefers_large_types when
building expected schema for Arrow grouped/cogrouped map UDFs
31fe6dd44626 is described below
commit 31fe6dd44626ac9934da887a0309fccc7e15706a
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 19 09:31:33 2026 +0800
[SPARK-56929][PYTHON] Pass prefers_large_types when building expected
schema for Arrow grouped/cogrouped map UDFs
### What changes were proposed in this pull request?
Forward `prefers_large_types=runner_conf.use_large_var_types` when building
`expected_cols_and_types` in `python/pyspark/worker.py` for
`SQL_GROUPED_MAP_ARROW_UDF`, `SQL_GROUPED_MAP_ARROW_ITER_UDF`, and
`SQL_COGROUPED_MAP_ARROW_UDF`. The matching `arrow_return_type` already
forwards the flag; the per-field expected schema was missing it.
### Why are the changes needed?
With `spark.sql.execution.arrow.useLargeVarTypes=true`, the result table
contains `large_string`/`large_binary` (per `arrow_return_type`) while the
expected schema contains plain `string`/`binary`, so `verify_arrow_result`
raises a spurious `RESULT_COLUMN_TYPES_MISMATCH`:
```python
spark.conf.set("spark.sql.execution.arrow.useLargeVarTypes", True)
df = spark.createDataFrame([(0, "foo", b"foo")], "id long, s string, b
binary")
df.groupBy("id").applyInArrow(lambda t: t, "id long, s string, b
binary").collect()
# [RESULT_COLUMN_TYPES_MISMATCH] column 's' (expected string, actual
large_string), ...
```
Pre-requisite for SPARK-56608.
### Does this PR introduce _any_ user-facing change?
Yes. `applyInArrow` (grouped and cogrouped, iterator and non-iterator) no
longer raises a spurious `RESULT_COLUMN_TYPES_MISMATCH` under
`useLargeVarTypes=true`. Default behavior unchanged.
### How was this patch tested?
Added `test_apply_in_arrow_large_var_types` to `test_arrow_grouped_map.py`
and `test_arrow_cogrouped_map.py`, covering name-based and positional
assignment for all three eval types (Spark Connect parity tests pick them up
via the mixins). Confirmed the new tests fail on master without the worker.py
change and pass with it.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #55961 from Yicong-Huang/fix-arrow-map-large-var-types.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/tests/arrow/test_arrow_cogrouped_map.py | 31 ++++++++++++++
.../sql/tests/arrow/test_arrow_grouped_map.py | 26 ++++++++++++
python/pyspark/worker.py | 48 +++++++++++++++++++---
3 files changed, 99 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
index 43577f690538..cfeba6cbc316 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
@@ -121,6 +121,37 @@ class CogroupedMapInArrowTestsMixin:
cogrouped_df = grouped_left_df.cogroup(grouped_right_df)
self.do_test_apply_in_arrow(cogrouped_df, key_column=None)
+ def test_apply_in_arrow_large_var_types(self):
+ # SPARK-56929: when useLargeVarTypes=true, the expected schema
computed by
+ # worker.py for result validation must also use
large_string/large_binary,
+ # otherwise verify_arrow_result raises a spurious
RESULT_COLUMN_TYPES_MISMATCH.
+ left = self.spark.createDataFrame(
+ [(0, "foo", b"foo"), (1, None, None)], "id long, s string, b
binary"
+ )
+ right = self.spark.createDataFrame(
+ [(0, "bar", b"bar"), (1, "baz", b"baz")], "id long, s string, b
binary"
+ )
+ schema = "s string, b binary"
+
+ def func(left_tbl, right_tbl):
+ assert pa.types.is_large_string(left_tbl.schema.field("s").type)
+ assert pa.types.is_large_binary(left_tbl.schema.field("b").type)
+ return left_tbl.select(["s", "b"])
+
+ expected = left.select("s", "b")
+ for assign_cols_by_name in [True, False]:
+ with self.subTest(assign_cols_by_name=assign_cols_by_name):
+ with self.sql_conf(
+ {
+ "spark.sql.execution.arrow.useLargeVarTypes": True,
+ "spark.sql.legacy.execution.pandas.groupedMap."
+ "assignColumnsByName": assign_cols_by_name,
+ }
+ ):
+ cogrouped = left.groupBy("id").cogroup(right.groupBy("id"))
+ actual = cogrouped.applyInArrow(func, schema)
+ assertDataFrameEqual(actual, expected)
+
def test_apply_in_arrow_not_returning_arrow_table(self):
def func(key, left, right):
return key
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index b10d1cf4d221..cefce8d0cf65 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -101,6 +101,32 @@ class ApplyInArrowTestsMixin:
actual2 = grouped_df.applyInArrow(func_variation, "id long, value
long").collect()
self.assertEqual(actual2, expected)
+ def test_apply_in_arrow_large_var_types(self):
+ # SPARK-56929: when useLargeVarTypes=true, the expected schema
computed by
+ # worker.py for result validation must also use
large_string/large_binary,
+ # otherwise verify_arrow_result raises a spurious
RESULT_COLUMN_TYPES_MISMATCH.
+ data = [(0, "foo", b"foo"), (0, "bar", b"bar"), (1, None, None), (1,
"baz", b"baz")]
+ df = self.spark.createDataFrame(data, "id long, s string, b binary")
+ schema = "id long, s string, b binary"
+
+ def func(table):
+ assert pa.types.is_large_string(table.schema.field("s").type)
+ assert pa.types.is_large_binary(table.schema.field("b").type)
+ return table
+
+ for assign_cols_by_name in [True, False]:
+ with self.subTest(assign_cols_by_name=assign_cols_by_name):
+ with self.sql_conf(
+ {
+ "spark.sql.execution.arrow.useLargeVarTypes": True,
+ "spark.sql.legacy.execution.pandas.groupedMap."
+ "assignColumnsByName": assign_cols_by_name,
+ }
+ ):
+ for func_variation in function_variations(func):
+ actual = df.groupby("id").applyInArrow(func_variation,
schema)
+ assertDataFrameEqual(actual, df)
+
def test_apply_in_arrow_empty_groupby(self):
df = self.data
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index cb7310589540..2d877565f55c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2653,11 +2653,23 @@ def read_udfs(pickleSer, udf_info_list, eval_type,
runner_conf, eval_conf):
)
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType, timezone="UTC") for col
in return_type.fields
+ col.name: to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ )
+ for col in return_type.fields
}
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+ (
+ col.name,
+ to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ ),
+ )
for col in return_type.fields
]
@@ -2726,11 +2738,23 @@ def read_udfs(pickleSer, udf_info_list, eval_type,
runner_conf, eval_conf):
)
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType, timezone="UTC") for col
in return_type.fields
+ col.name: to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ )
+ for col in return_type.fields
}
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+ (
+ col.name,
+ to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ ),
+ )
for col in return_type.fields
]
@@ -2945,12 +2969,24 @@ def read_udfs(pickleSer, udf_info_list, eval_type,
runner_conf, eval_conf):
# columns must raise; no silent coercion.
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType, timezone="UTC") for col
in return_type.fields
+ col.name: to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ )
+ for col in return_type.fields
}
reorder_names = [col.name for col in return_type.fields]
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+ (
+ col.name,
+ to_arrow_type(
+ col.dataType,
+ timezone="UTC",
+ prefers_large_types=runner_conf.use_large_var_types,
+ ),
+ )
for col in return_type.fields
]
reorder_names = None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]