This is an automated email from the ASF dual-hosted git repository.

zhengruifeng pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 87ff2a97fada [SPARK-56929][PYTHON] Pass prefers_large_types when 
building expected schema for Arrow grouped/cogrouped map UDFs
87ff2a97fada is described below

commit 87ff2a97fada854fe14a69a9e076302cd80de095
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 19 09:31:33 2026 +0800

    [SPARK-56929][PYTHON] Pass prefers_large_types when building expected 
schema for Arrow grouped/cogrouped map UDFs
    
    ### What changes were proposed in this pull request?
    
    Forward `prefers_large_types=runner_conf.use_large_var_types` when building 
`expected_cols_and_types` in `python/pyspark/worker.py` for 
`SQL_GROUPED_MAP_ARROW_UDF`, `SQL_GROUPED_MAP_ARROW_ITER_UDF`, and 
`SQL_COGROUPED_MAP_ARROW_UDF`. The matching `arrow_return_type` already 
forwards the flag; the per-field expected schema was missing it.
    
    ### Why are the changes needed?
    
    With `spark.sql.execution.arrow.useLargeVarTypes=true`, the result table 
contains `large_string`/`large_binary` (per `arrow_return_type`) while the 
expected schema contains plain `string`/`binary`, so `verify_arrow_result` 
raises a spurious `RESULT_COLUMN_TYPES_MISMATCH`:
    
    ```python
    spark.conf.set("spark.sql.execution.arrow.useLargeVarTypes", True)
    df = spark.createDataFrame([(0, "foo", b"foo")], "id long, s string, b 
binary")
    df.groupBy("id").applyInArrow(lambda t: t, "id long, s string, b 
binary").collect()
    # [RESULT_COLUMN_TYPES_MISMATCH] column 's' (expected string, actual 
large_string), ...
    ```
    
    Pre-requisite for SPARK-56608.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. `applyInArrow` (grouped and cogrouped, iterator and non-iterator) no 
longer raises a spurious `RESULT_COLUMN_TYPES_MISMATCH` under 
`useLargeVarTypes=true`. Default behavior unchanged.
    
    ### How was this patch tested?
    
    Added `test_apply_in_arrow_large_var_types` to `test_arrow_grouped_map.py` 
and `test_arrow_cogrouped_map.py`, covering name-based and positional 
assignment for all three eval types (Spark Connect parity tests pick them up 
via the mixins). Confirmed the new tests fail on master without the worker.py 
change and pass with it.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #55961 from Yicong-Huang/fix-arrow-map-large-var-types.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 31fe6dd44626ac9934da887a0309fccc7e15706a)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/tests/arrow/test_arrow_cogrouped_map.py    | 31 ++++++++++++++
 .../sql/tests/arrow/test_arrow_grouped_map.py      | 26 ++++++++++++
 python/pyspark/worker.py                           | 48 +++++++++++++++++++---
 3 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
index 43577f690538..cfeba6cbc316 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
@@ -121,6 +121,37 @@ class CogroupedMapInArrowTestsMixin:
         cogrouped_df = grouped_left_df.cogroup(grouped_right_df)
         self.do_test_apply_in_arrow(cogrouped_df, key_column=None)
 
+    def test_apply_in_arrow_large_var_types(self):
+        # SPARK-56929: when useLargeVarTypes=true, the expected schema 
computed by
+        # worker.py for result validation must also use 
large_string/large_binary,
+        # otherwise verify_arrow_result raises a spurious 
RESULT_COLUMN_TYPES_MISMATCH.
+        left = self.spark.createDataFrame(
+            [(0, "foo", b"foo"), (1, None, None)], "id long, s string, b 
binary"
+        )
+        right = self.spark.createDataFrame(
+            [(0, "bar", b"bar"), (1, "baz", b"baz")], "id long, s string, b 
binary"
+        )
+        schema = "s string, b binary"
+
+        def func(left_tbl, right_tbl):
+            assert pa.types.is_large_string(left_tbl.schema.field("s").type)
+            assert pa.types.is_large_binary(left_tbl.schema.field("b").type)
+            return left_tbl.select(["s", "b"])
+
+        expected = left.select("s", "b")
+        for assign_cols_by_name in [True, False]:
+            with self.subTest(assign_cols_by_name=assign_cols_by_name):
+                with self.sql_conf(
+                    {
+                        "spark.sql.execution.arrow.useLargeVarTypes": True,
+                        "spark.sql.legacy.execution.pandas.groupedMap."
+                        "assignColumnsByName": assign_cols_by_name,
+                    }
+                ):
+                    cogrouped = left.groupBy("id").cogroup(right.groupBy("id"))
+                    actual = cogrouped.applyInArrow(func, schema)
+                    assertDataFrameEqual(actual, expected)
+
     def test_apply_in_arrow_not_returning_arrow_table(self):
         def func(key, left, right):
             return key
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index b10d1cf4d221..cefce8d0cf65 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -101,6 +101,32 @@ class ApplyInArrowTestsMixin:
             actual2 = grouped_df.applyInArrow(func_variation, "id long, value 
long").collect()
             self.assertEqual(actual2, expected)
 
+    def test_apply_in_arrow_large_var_types(self):
+        # SPARK-56929: when useLargeVarTypes=true, the expected schema 
computed by
+        # worker.py for result validation must also use 
large_string/large_binary,
+        # otherwise verify_arrow_result raises a spurious 
RESULT_COLUMN_TYPES_MISMATCH.
+        data = [(0, "foo", b"foo"), (0, "bar", b"bar"), (1, None, None), (1, 
"baz", b"baz")]
+        df = self.spark.createDataFrame(data, "id long, s string, b binary")
+        schema = "id long, s string, b binary"
+
+        def func(table):
+            assert pa.types.is_large_string(table.schema.field("s").type)
+            assert pa.types.is_large_binary(table.schema.field("b").type)
+            return table
+
+        for assign_cols_by_name in [True, False]:
+            with self.subTest(assign_cols_by_name=assign_cols_by_name):
+                with self.sql_conf(
+                    {
+                        "spark.sql.execution.arrow.useLargeVarTypes": True,
+                        "spark.sql.legacy.execution.pandas.groupedMap."
+                        "assignColumnsByName": assign_cols_by_name,
+                    }
+                ):
+                    for func_variation in function_variations(func):
+                        actual = df.groupby("id").applyInArrow(func_variation, 
schema)
+                        assertDataFrameEqual(actual, df)
+
     def test_apply_in_arrow_empty_groupby(self):
         df = self.data
 
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4aa7bc68f3d5..c24b7b89ddce 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2689,11 +2689,23 @@ def read_udfs(pickleSer, udf_info_list, eval_type, 
runner_conf, eval_conf):
         )
         if runner_conf.assign_cols_by_name:
             expected_cols_and_types = {
-                col.name: to_arrow_type(col.dataType, timezone="UTC") for col 
in return_type.fields
+                col.name: to_arrow_type(
+                    col.dataType,
+                    timezone="UTC",
+                    prefers_large_types=runner_conf.use_large_var_types,
+                )
+                for col in return_type.fields
             }
         else:
             expected_cols_and_types = [
-                (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+                (
+                    col.name,
+                    to_arrow_type(
+                        col.dataType,
+                        timezone="UTC",
+                        prefers_large_types=runner_conf.use_large_var_types,
+                    ),
+                )
                 for col in return_type.fields
             ]
 
@@ -2762,11 +2774,23 @@ def read_udfs(pickleSer, udf_info_list, eval_type, 
runner_conf, eval_conf):
         )
         if runner_conf.assign_cols_by_name:
             expected_cols_and_types = {
-                col.name: to_arrow_type(col.dataType, timezone="UTC") for col 
in return_type.fields
+                col.name: to_arrow_type(
+                    col.dataType,
+                    timezone="UTC",
+                    prefers_large_types=runner_conf.use_large_var_types,
+                )
+                for col in return_type.fields
             }
         else:
             expected_cols_and_types = [
-                (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+                (
+                    col.name,
+                    to_arrow_type(
+                        col.dataType,
+                        timezone="UTC",
+                        prefers_large_types=runner_conf.use_large_var_types,
+                    ),
+                )
                 for col in return_type.fields
             ]
 
@@ -2981,12 +3005,24 @@ def read_udfs(pickleSer, udf_info_list, eval_type, 
runner_conf, eval_conf):
         # columns must raise; no silent coercion.
         if runner_conf.assign_cols_by_name:
             expected_cols_and_types = {
-                col.name: to_arrow_type(col.dataType, timezone="UTC") for col 
in return_type.fields
+                col.name: to_arrow_type(
+                    col.dataType,
+                    timezone="UTC",
+                    prefers_large_types=runner_conf.use_large_var_types,
+                )
+                for col in return_type.fields
             }
             reorder_names = [col.name for col in return_type.fields]
         else:
             expected_cols_and_types = [
-                (col.name, to_arrow_type(col.dataType, timezone="UTC"))
+                (
+                    col.name,
+                    to_arrow_type(
+                        col.dataType,
+                        timezone="UTC",
+                        prefers_large_types=runner_conf.use_large_var_types,
+                    ),
+                )
                 for col in return_type.fields
             ]
             reorder_names = None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to