This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a77cdbdbcb9d 
[SPARK-54481][SPARK-54495][SPARK-54483][4.0][TEST][PYTHON][CONNECT] Fix failing 
tests
a77cdbdbcb9d is described below

commit a77cdbdbcb9d80e2b176037554efffbf825274a6
Author: xianzhe-databricks <[email protected]>
AuthorDate: Mon Dec 1 18:48:52 2025 +0900

    [SPARK-54481][SPARK-54495][SPARK-54483][4.0][TEST][PYTHON][CONNECT] Fix 
failing tests
    
    ### What changes were proposed in this pull request?
    3 tests fail on cross-version tests: Spark connect 4.0 client against Spark 
connect 4.2 server. We fixed them in this PR.
    
    We basically cherry-picked fix from these PRs:
    * https://github.com/apache/spark/pull/51635
    * https://github.com/apache/spark/pull/51120
    
    ### Why are the changes needed?
    
    Make post-merge CI happy!
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Following 
https://github.com/apache/spark/blob/master/.github/workflows/build_python_connect40.yml#L85-L106,
 I manually tested it on my local machine.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    I finished this PR with the help of claude code.
    
    Closes #53250 from xianzhe-databricks/SPARK-54481-reenable-tests.
    
    Lead-authored-by: xianzhe-databricks <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/tests/test_typedef.py        |  8 ++--
 .../sql/tests/pandas/test_pandas_cogrouped_map.py  |  6 +--
 python/pyspark/sql/tests/pandas/test_pandas_map.py | 49 +++++++++++++---------
 3 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/python/pyspark/pandas/tests/test_typedef.py 
b/python/pyspark/pandas/tests/test_typedef.py
index 62ba2672de19..afed59660d74 100644
--- a/python/pyspark/pandas/tests/test_typedef.py
+++ b/python/pyspark/pandas/tests/test_typedef.py
@@ -311,13 +311,9 @@ class TypeHintTestsMixin:
 
         self.assertRaisesRegex(TypeError, "object.*not understood", 
try_infer_return_type)
 
-    @unittest.skipIf(
-        os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", 
"SPARK-54495: To be reenabled"
-    )
     def test_as_spark_type_pandas_on_spark_dtype(self):
         type_mapper = {
             # binary
-            np.character: (np.character, BinaryType()),
             np.bytes_: (np.bytes_, BinaryType()),
             bytes: (np.bytes_, BinaryType()),
             # integer
@@ -352,6 +348,10 @@ class TypeHintTestsMixin:
             ),
         }
 
+        if LooseVersion(np.__version__) < LooseVersion("2.3"):
+            # binary
+            type_mapper.update({np.character: (np.character, BinaryType())})
+
         for numpy_or_python_type, (dtype, spark_type) in type_mapper.items():
             self.assertEqual(as_spark_type(numpy_or_python_type), spark_type)
             self.assertEqual(pandas_on_spark_type(numpy_or_python_type), 
(dtype, spark_type))
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
index ab937cb68f74..3a6ab9c98ebb 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
@@ -17,7 +17,6 @@
 
 import unittest
 from typing import cast
-import os
 
 from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf, 
sum
 from pyspark.sql.types import (
@@ -240,9 +239,6 @@ class CogroupedApplyInPandasTestsMixin:
 
         self._test_merge_empty(fn=merge_pandas)
 
-    @unittest.skipIf(
-        os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", 
"SPARK-54481: To be reenabled"
-    )
     def test_apply_in_pandas_returning_incompatible_type(self):
         with self.quiet():
             self.check_apply_in_pandas_returning_incompatible_type()
@@ -266,7 +262,7 @@ class CogroupedApplyInPandasTestsMixin:
                             
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
                         )
                     self._test_merge_error(
-                        fn=lambda lft, rgt: pd.DataFrame({"id": [1], "k": 
["2.0"]}),
+                        fn=lambda lft, rgt: pd.DataFrame({"id": [1], "k": 
["test_string"]}),
                         output_schema="id long, k double",
                         errorClass=PythonException,
                         error_message_regex=expected,
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_map.py
index a9f66b161403..e5d0b56be691 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py
@@ -246,24 +246,22 @@ class MapInPandasTestsMixin:
         actual = df.repartition(1).mapInPandas(f, "id long, value 
long").collect()
         self.assertEqual(actual, expected)
 
-    @unittest.skipIf(
-        os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", 
"SPARK-54483: To be reenabled"
-    )
     def test_dataframes_with_incompatible_types(self):
         with self.quiet():
             self.check_dataframes_with_incompatible_types()
 
     def check_dataframes_with_incompatible_types(self):
-        def func(iterator):
-            for pdf in iterator:
-                yield pdf.assign(id=pdf["id"].apply(str))
-
         for safely in [True, False]:
             with self.subTest(convertToArrowArraySafely=safely), self.sql_conf(
                 {"spark.sql.execution.pandas.convertToArrowArraySafely": 
safely}
             ):
                 # sometimes we see ValueErrors
                 with self.subTest(convert="string to double"):
+
+                    def func(iterator):
+                        for pdf in iterator:
+                            yield pdf.assign(id="test_string")
+
                     expected = (
                         r"ValueError: Exception thrown when converting 
pandas.Series "
                         r"\(object\) with name 'id' to Arrow Array \(double\)."
@@ -282,18 +280,31 @@ class MapInPandasTestsMixin:
                             .collect()
                         )
 
-                # sometimes we see TypeErrors
-                with self.subTest(convert="double to string"):
-                    with self.assertRaisesRegex(
-                        PythonException,
-                        r"TypeError: Exception thrown when converting 
pandas.Series "
-                        r"\(float64\) with name 'id' to Arrow Array 
\(string\).\n",
-                    ):
-                        (
-                            self.spark.range(10, numPartitions=3)
-                            .select(col("id").cast("double"))
-                            .mapInPandas(self.identity_dataframes_iter("id"), 
"id string")
-                            .collect()
+                with self.subTest(convert="float to int precision loss"):
+
+                    def func(iterator):
+                        for pdf in iterator:
+                            yield pdf.assign(id=pdf["id"] + 0.1)
+
+                    df = (
+                        self.spark.range(10, numPartitions=3)
+                        .select(col("id").cast("double"))
+                        .mapInPandas(func, "id int")
+                    )
+                    if safely:
+                        expected = (
+                            r"ValueError: Exception thrown when converting 
pandas.Series "
+                            r"\(float64\) with name 'id' to Arrow Array 
\(int32\)."
+                            " It can be caused by overflows or other "
+                            "unsafe conversions warned by Arrow. Arrow safe 
type check "
+                            "can be disabled by using SQL config "
+                            
"`spark.sql.execution.pandas.convertToArrowArraySafely`."
+                        )
+                        with self.assertRaisesRegex(PythonException, expected 
+ "\n"):
+                            df.collect()
+                    else:
+                        self.assertEqual(
+                            df.collect(), self.spark.range(10, 
numPartitions=3).collect()
                         )
 
     def test_empty_iterator(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to