This is an automated email from the ASF dual-hosted git repository.

zhengruifeng pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new d3d88250b68e [SPARK-56937][PYTHON] Raise error on wrong column count 
in Arrow grouped/cogrouped map UDF
d3d88250b68e is described below

commit d3d88250b68ee0c4b8a388577e5be3b921ac01f3
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 20 09:14:11 2026 +0800

    [SPARK-56937][PYTHON] Raise error on wrong column count in Arrow 
grouped/cogrouped map UDF
    
    ### What changes were proposed in this pull request?
    
    In `verify_arrow_result` (`python/pyspark/worker.py`), the positional 
branch zips expected and actual columns without a length check, silently 
truncating to the shorter list. This PR raises `RESULT_COLUMN_SCHEMA_MISMATCH` 
on length mismatch.
    
    ### Why are the changes needed?
    
    Latent since SPARK-40559. Under `assignColumnsByName=false`, a UDF 
returning the wrong number of columns either silently drops data (too many) or 
surfaces a JVM `ArrayIndexOutOfBoundsException` (too few). The name-based 
branch already raises a friendly error; positional should be symmetric.
    
    Affects `SQL_GROUPED_MAP_ARROW_UDF`, `SQL_GROUPED_MAP_ARROW_ITER_UDF`, 
`SQL_COGROUPED_MAP_ARROW_UDF`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Wrong column count under positional mode now raises 
`RESULT_COLUMN_SCHEMA_MISMATCH` instead of silent truncation or a JVM error.
    
    ### How was this patch tested?
    
    Added 
`test_apply_in_arrow_returning_wrong_column_count_positional_assignment` in 
`test_arrow_grouped_map.py` (covers iterator variant via `function_variations`) 
and `test_arrow_cogrouped_map.py`, exercising both too-many and too-few 
columns. Full grouped/cogrouped Arrow map suites pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #55978 from Yicong-Huang/SPARK-56937.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 4306d0265a5a07a82e849061d14d61ca898b62a8)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/tests/arrow/test_arrow_cogrouped_map.py    | 28 ++++++++++++++++++
 .../sql/tests/arrow/test_arrow_grouped_map.py      | 33 ++++++++++++++++++++++
 python/pyspark/worker.py                           |  8 ++++++
 3 files changed, 69 insertions(+)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
index cfeba6cbc316..5b272f89bb5d 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
@@ -230,6 +230,34 @@ class CogroupedMapInArrowTestsMixin:
                 # stats returns three columns while here we set schema with 
two columns
                 self.cogrouped.applyInArrow(stats, schema="id long, m 
double").collect()
 
+    def 
test_apply_in_arrow_returning_wrong_column_count_positional_assignment(self):
+        def too_many_cols(key, left, right):
+            return pa.Table.from_pydict(
+                {
+                    "a": [key[0].as_py()],
+                    "b": [pc.mean(left.column("v")).as_py()],
+                    "c": [pc.mean(right.column("v")).as_py()],
+                }
+            )
+
+        def too_few_cols(key, left, right):
+            return pa.Table.from_pydict({"a": [key[0].as_py()]})
+
+        with self.sql_conf(
+            
{"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}
+        ):
+            with self.quiet():
+                for func, expected, actual in [
+                    (too_many_cols, 2, 3),
+                    (too_few_cols, 2, 1),
+                ]:
+                    with self.subTest(func=func.__name__):
+                        with self.assertRaisesRegex(
+                            PythonException,
+                            rf"Expected: {expected}.*Actual: {actual}",
+                        ):
+                            self.cogrouped.applyInArrow(func, schema="a long, 
b double").collect()
+
     def test_apply_in_arrow_returning_empty_dataframe(self):
         def odd_means(key, left, right):
             if key[0].as_py() == 0:
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index cefce8d0cf65..e0d40cfebe59 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -254,6 +254,39 @@ class ApplyInArrowTestsMixin:
                         func_variation, schema="id long, m double"
                     ).collect()
 
+    def 
test_apply_in_arrow_returning_wrong_column_count_positional_assignment(self):
+        df = self.data
+
+        def too_many_cols(key, table):
+            return pa.Table.from_pydict(
+                {
+                    "a": [key[0].as_py()],
+                    "b": [pc.mean(table.column("v")).as_py()],
+                    "c": [pc.stddev(table.column("v")).as_py()],
+                }
+            )
+
+        def too_few_cols(key, table):
+            return pa.Table.from_pydict({"a": [key[0].as_py()]})
+
+        with self.sql_conf(
+            
{"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}
+        ):
+            with self.quiet():
+                for func, expected, actual in [
+                    (too_many_cols, 2, 3),
+                    (too_few_cols, 2, 1),
+                ]:
+                    with self.subTest(func=func.__name__):
+                        for func_variation in function_variations(func):
+                            with self.assertRaisesRegex(
+                                PythonException,
+                                rf"Expected: {expected}.*Actual: {actual}",
+                            ):
+                                df.groupby("id").applyInArrow(
+                                    func_variation, schema="a long, b double"
+                                ).collect()
+
     def test_apply_in_arrow_returning_empty_dataframe(self):
         df = self.data
 
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c24b7b89ddce..ad4742bfb6c7 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -583,6 +583,14 @@ def verify_arrow_result(result, assign_cols_by_name, 
expected_cols_and_types):
             actual_cols_and_types = [
                 (name, dataType) for name, dataType in 
zip(result.schema.names, result.schema.types)
             ]
+            if len(actual_cols_and_types) != len(expected_cols_and_types):
+                raise PySparkRuntimeError(
+                    errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
+                    messageParameters={
+                        "expected": str(len(expected_cols_and_types)),
+                        "actual": str(len(actual_cols_and_types)),
+                    },
+                )
             column_types = [
                 (expected_name, expected_type, actual_type)
                 for (expected_name, expected_type), (actual_name, actual_type) 
in zip(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to