This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7427ff46db45 [SPARK-53979][PYTHON][TESTS] Drop temporary functions in 
Pandas UDF tests
7427ff46db45 is described below

commit 7427ff46db45e9ef5ad7bbb99dab465c3c470904
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Oct 22 14:59:51 2025 +0800

    [SPARK-53979][PYTHON][TESTS] Drop temporary functions in Pandas UDF tests
    
    ### What changes were proposed in this pull request?
    Drop temporary functions in Pandas UDF tests
    
    ### Why are the changes needed?
    for isolation of testing envs
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52690 from zhengruifeng/with_temp_func_pd.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/tests/pandas/test_pandas_grouped_map.py    |  35 ++--
 .../tests/pandas/test_pandas_udf_grouped_agg.py    |  38 +++--
 .../sql/tests/pandas/test_pandas_udf_scalar.py     | 182 +++++++++++----------
 .../sql/tests/pandas/test_pandas_udf_window.py     |   6 +-
 python/pyspark/sql/tests/test_udf.py               |   3 +-
 5 files changed, 140 insertions(+), 124 deletions(-)

diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index fb81cd772777..4c52303481fa 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -208,25 +208,22 @@ class ApplyInPandasTestsMixin:
         assert_frame_equal(expected, result)
 
     def test_register_grouped_map_udf(self):
-        with self.quiet():
-            self.check_register_grouped_map_udf()
-
-    def check_register_grouped_map_udf(self):
-        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
-
-        with self.assertRaises(PySparkTypeError) as pe:
-            self.spark.catalog.registerFunction("foo_udf", foo_udf)
-
-        self.check_error(
-            exception=pe.exception,
-            errorClass="INVALID_UDF_EVAL_TYPE",
-            messageParameters={
-                "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
-                "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
-                "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
-                "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
-            },
-        )
+        with self.quiet(), self.temp_func("foo_udf"):
+            foo_udf = pandas_udf(lambda x: x, "id long", 
PandasUDFType.GROUPED_MAP)
+
+            with self.assertRaises(PySparkTypeError) as pe:
+                self.spark.catalog.registerFunction("foo_udf", foo_udf)
+
+            self.check_error(
+                exception=pe.exception,
+                errorClass="INVALID_UDF_EVAL_TYPE",
+                messageParameters={
+                    "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
+                    "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
+                    "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
+                    "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
+                },
+            )
 
     def test_decorator(self):
         df = self.data
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index cfcbb96fcc36..3fd970061b30 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -522,17 +522,23 @@ class GroupedAggPandasUDFTestsMixin:
             df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
     def test_register_vectorized_udf_basic(self):
-        sum_pandas_udf = pandas_udf(
-            lambda v: v.sum(), "integer", 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
-        )
+        with self.temp_func("sum_pandas_udf"):
+            sum_pandas_udf = pandas_udf(
+                lambda v: v.sum(), "integer", 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+            )
 
-        self.assertEqual(sum_pandas_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
-        group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", 
sum_pandas_udf)
-        self.assertEqual(group_agg_pandas_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
-        q = "SELECT sum_pandas_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) 
tbl(v1, v2) GROUP BY v2"
-        actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
-        expected = [1, 5]
-        self.assertEqual(actual, expected)
+            self.assertEqual(sum_pandas_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
+            group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", 
sum_pandas_udf)
+            self.assertEqual(
+                group_agg_pandas_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+            )
+            q = """
+                SELECT sum_pandas_udf(v1)
+                FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2
+                """
+            actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
+            expected = [1, 5]
+            self.assertEqual(actual, expected)
 
     def test_grouped_with_empty_partition(self):
         data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
@@ -551,10 +557,10 @@ class GroupedAggPandasUDFTestsMixin:
             return v.max()
 
         df = self.spark.range(0, 100)
-        self.spark.udf.register("max_udf", max_udf)
 
-        with self.tempView("table"):
+        with self.tempView("table"), self.temp_func("max_udf"):
             df.createTempView("table")
+            self.spark.udf.register("max_udf", max_udf)
 
             agg1 = df.agg(max_udf(df["id"]))
             agg2 = self.spark.sql("select max_udf(id) from table")
@@ -579,7 +585,7 @@ class GroupedAggPandasUDFTestsMixin:
         df = self.data
         weighted_mean = self.pandas_agg_weighted_mean_udf
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
@@ -604,7 +610,7 @@ class GroupedAggPandasUDFTestsMixin:
         df = self.data
         weighted_mean = self.pandas_agg_weighted_mean_udf
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
@@ -644,7 +650,7 @@ class GroupedAggPandasUDFTestsMixin:
 
             return np.average(kwargs["v"], weights=kwargs["w"])
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
@@ -684,7 +690,7 @@ class GroupedAggPandasUDFTestsMixin:
         def biased_sum(v, w=None):
             return v.sum() + (w.sum() if w is not None else 100)
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("biased_sum"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("biased_sum", biased_sum)
 
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index e614d9039b61..3c2ae56067ae 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -293,13 +293,17 @@ class ScalarPandasUDFTestsMixin:
         ).asNondeterministic()
         self.assertEqual(random_pandas_udf.deterministic, False)
         self.assertEqual(random_pandas_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
-        nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
-            "randomPandasUDF", random_pandas_udf
-        )
-        self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
-        self.assertEqual(nondeterministic_pandas_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
-        [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
-        self.assertEqual(row[0], 7)
+
+        with self.temp_func("randomPandasUDF"):
+            nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
+                "randomPandasUDF", random_pandas_udf
+            )
+            self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
+            self.assertEqual(
+                nondeterministic_pandas_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_UDF
+            )
+            [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
+            self.assertEqual(row[0], 7)
 
         def random_iter_udf(it):
             for i in it:
@@ -310,15 +314,17 @@ class ScalarPandasUDFTestsMixin:
         ).asNondeterministic()
         self.assertEqual(random_pandas_iter_udf.deterministic, False)
         self.assertEqual(random_pandas_iter_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
-        nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction(
-            "randomPandasIterUDF", random_pandas_iter_udf
-        )
-        self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False)
-        self.assertEqual(
-            nondeterministic_pandas_iter_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
-        )
-        [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
-        self.assertEqual(row[0], 7)
+
+        with self.temp_func("randomPandasIterUDF"):
+            nondeterministic_pandas_iter_udf = 
self.spark.catalog.registerFunction(
+                "randomPandasIterUDF", random_pandas_iter_udf
+            )
+            self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, 
False)
+            self.assertEqual(
+                nondeterministic_pandas_iter_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
+            )
+            [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
+            self.assertEqual(row[0], 7)
 
     def test_vectorized_udf_null_boolean(self):
         data = [(True,), (True,), (None,), (False,)]
@@ -1397,14 +1403,16 @@ class ScalarPandasUDFTestsMixin:
 
         for original_add in [scalar_original_add, iter_original_add]:
             self.assertEqual(original_add.deterministic, True)
-            new_add = self.spark.catalog.registerFunction("add1", original_add)
-            res1 = df.select(new_add(col("a"), col("b")))
-            res2 = self.spark.sql(
-                "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM 
range(10)) t"
-            )
-            expected = df.select(expr("a + b"))
-            self.assertEqual(expected.collect(), res1.collect())
-            self.assertEqual(expected.collect(), res2.collect())
+
+            with self.temp_func("add1"):
+                new_add = self.spark.catalog.registerFunction("add1", 
original_add)
+                res1 = df.select(new_add(col("a"), col("b")))
+                res2 = self.spark.sql(
+                    "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM 
range(10)) t"
+                )
+                expected = df.select(expr("a + b"))
+                self.assertEqual(expected.collect(), res1.collect())
+                self.assertEqual(expected.collect(), res2.collect())
 
     def test_scalar_iter_udf_init(self):
         import numpy as np
@@ -1788,92 +1796,96 @@ class ScalarPandasUDFTestsMixin:
         def test_udf(a, b):
             return a + 10 * b
 
-        self.spark.udf.register("test_udf", test_udf)
+        with self.temp_func("test_udf"):
+            self.spark.udf.register("test_udf", test_udf)
 
-        for i, df in enumerate(
-            [
-                self.spark.range(2).select(test_udf(col("id"), b=col("id") * 
10)),
-                self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 
10)),
-                self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
-                self.spark.sql("SELECT test_udf(id, b => id * 10) FROM 
range(2)"),
-                self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM 
range(2)"),
-                self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM 
range(2)"),
-            ]
-        ):
-            with self.subTest(query_no=i):
-                assertDataFrameEqual(df, [Row(0), Row(101)])
+            for i, df in enumerate(
+                [
+                    self.spark.range(2).select(test_udf(col("id"), b=col("id") 
* 10)),
+                    self.spark.range(2).select(test_udf(a=col("id"), 
b=col("id") * 10)),
+                    self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
+                    self.spark.sql("SELECT test_udf(id, b => id * 10) FROM 
range(2)"),
+                    self.spark.sql("SELECT test_udf(a => id, b => id * 10) 
FROM range(2)"),
+                    self.spark.sql("SELECT test_udf(b => id * 10, a => id) 
FROM range(2)"),
+                ]
+            ):
+                with self.subTest(query_no=i):
+                    assertDataFrameEqual(df, [Row(0), Row(101)])
 
     def test_named_arguments_negative(self):
         @pandas_udf("int")
         def test_udf(a, b):
             return a + b
 
-        self.spark.udf.register("test_udf", test_udf)
+        with self.temp_func("test_udf"):
+            self.spark.udf.register("test_udf", test_udf)
 
-        with self.assertRaisesRegex(
-            AnalysisException,
-            
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
-        ):
-            self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM 
range(2)").show()
+            with self.assertRaisesRegex(
+                AnalysisException,
+                
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
+            ):
+                self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM 
range(2)").show()
 
-        with self.assertRaisesRegex(AnalysisException, 
"UNEXPECTED_POSITIONAL_ARGUMENT"):
-            self.spark.sql("SELECT test_udf(a => id, id * 10) FROM 
range(2)").show()
+            with self.assertRaisesRegex(AnalysisException, 
"UNEXPECTED_POSITIONAL_ARGUMENT"):
+                self.spark.sql("SELECT test_udf(a => id, id * 10) FROM 
range(2)").show()
 
-        with self.assertRaisesRegex(
-            PythonException, r"test_udf\(\) got an unexpected keyword argument 
'c'"
-        ):
-            self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show()
+            with self.assertRaisesRegex(
+                PythonException, r"test_udf\(\) got an unexpected keyword 
argument 'c'"
+            ):
+                self.spark.sql("SELECT test_udf(c => 'x') FROM 
range(2)").show()
 
     def test_kwargs(self):
         @pandas_udf("int")
         def test_udf(a, **kwargs):
             return a + 10 * kwargs["b"]
 
-        self.spark.udf.register("test_udf", test_udf)
+        with self.temp_func("test_udf"):
+            self.spark.udf.register("test_udf", test_udf)
 
-        for i, df in enumerate(
-            [
-                self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 
10)),
-                self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
-                self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM 
range(2)"),
-                self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM 
range(2)"),
-            ]
-        ):
-            with self.subTest(query_no=i):
-                assertDataFrameEqual(df, [Row(0), Row(101)])
+            for i, df in enumerate(
+                [
+                    self.spark.range(2).select(test_udf(a=col("id"), 
b=col("id") * 10)),
+                    self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
+                    self.spark.sql("SELECT test_udf(a => id, b => id * 10) 
FROM range(2)"),
+                    self.spark.sql("SELECT test_udf(b => id * 10, a => id) 
FROM range(2)"),
+                ]
+            ):
+                with self.subTest(query_no=i):
+                    assertDataFrameEqual(df, [Row(0), Row(101)])
 
     def test_named_arguments_and_defaults(self):
         @pandas_udf("int")
         def test_udf(a, b=0):
             return a + 10 * b
 
-        self.spark.udf.register("test_udf", test_udf)
+        with self.temp_func("test_udf"):
+            self.spark.udf.register("test_udf", test_udf)
 
-        # without "b"
-        for i, df in enumerate(
-            [
-                self.spark.range(2).select(test_udf(col("id"))),
-                self.spark.range(2).select(test_udf(a=col("id"))),
-                self.spark.sql("SELECT test_udf(id) FROM range(2)"),
-                self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
-            ]
-        ):
-            with self.subTest(with_b=False, query_no=i):
-                assertDataFrameEqual(df, [Row(0), Row(1)])
+            # without "b"
+            for i, df in enumerate(
+                [
+                    self.spark.range(2).select(test_udf(col("id"))),
+                    self.spark.range(2).select(test_udf(a=col("id"))),
+                    self.spark.sql("SELECT test_udf(id) FROM range(2)"),
+                    self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
+                ]
+            ):
+                with self.subTest(with_b=False, query_no=i):
+                    assertDataFrameEqual(df, [Row(0), Row(1)])
 
-        # with "b"
-        for i, df in enumerate(
-            [
-                self.spark.range(2).select(test_udf(col("id"), b=col("id") * 
10)),
-                self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 
10)),
-                self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
-                self.spark.sql("SELECT test_udf(id, b => id * 10) FROM 
range(2)"),
-                self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM 
range(2)"),
-                self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM 
range(2)"),
-            ]
-        ):
-            with self.subTest(with_b=True, query_no=i):
-                assertDataFrameEqual(df, [Row(0), Row(101)])
+            # with "b"
+            for i, df in enumerate(
+                [
+                    self.spark.range(2).select(test_udf(col("id"), b=col("id") 
* 10)),
+                    self.spark.range(2).select(test_udf(a=col("id"), 
b=col("id") * 10)),
+                    self.spark.range(2).select(test_udf(b=col("id") * 10, 
a=col("id"))),
+                    self.spark.sql("SELECT test_udf(id, b => id * 10) FROM 
range(2)"),
+                    self.spark.sql("SELECT test_udf(a => id, b => id * 10) 
FROM range(2)"),
+                    self.spark.sql("SELECT test_udf(b => id * 10, a => id) 
FROM range(2)"),
+                ]
+            ):
+                with self.subTest(with_b=True, query_no=i):
+                    assertDataFrameEqual(df, [Row(0), Row(101)])
 
     def test_arrow_cast_enabled_numeric_to_decimal(self):
         import numpy as np
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
index fbc2b32d1c69..547e237902b3 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
@@ -408,7 +408,7 @@ class WindowPandasUDFTestsMixin:
                 with self.subTest(bound=bound, query_no=i):
                     assertDataFrameEqual(windowed, df.withColumn("wm", 
sf.mean(df.v).over(w)))
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
@@ -436,7 +436,7 @@ class WindowPandasUDFTestsMixin:
         df = self.data
         weighted_mean = self.pandas_agg_weighted_mean_udf
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
@@ -504,7 +504,7 @@ class WindowPandasUDFTestsMixin:
                 with self.subTest(bound=bound, query_no=i):
                     assertDataFrameEqual(windowed, df.withColumn("wm", 
sf.mean(df.v).over(w)))
 
-        with self.tempView("v"):
+        with self.tempView("v"), self.temp_func("weighted_mean"):
             df.createOrReplaceTempView("v")
             self.spark.udf.register("weighted_mean", weighted_mean)
 
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index b1fb42ad11ec..8d792a54e346 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -216,7 +216,7 @@ class BaseUDFTestsMixin(object):
             self.assertEqual(tuple(row), (2,))
 
     def test_multiple_udfs(self):
-        with self.temp_func("double_int"):
+        with self.temp_func("double_int", "add_int"):
             self.spark.catalog.registerFunction("double_int", lambda x: x * 2, 
IntegerType())
             [row] = self.spark.sql("SELECT double_int(1), 
double_int(2)").collect()
             self.assertEqual(tuple(row), (2, 4))
@@ -224,6 +224,7 @@ class BaseUDFTestsMixin(object):
                 "SELECT double_int(double_int(1)), double_int(double_int(2) + 
2)"
             ).collect()
             self.assertEqual(tuple(row), (4, 12))
+
             self.spark.catalog.registerFunction("add_int", lambda x, y: x + y, 
IntegerType())
             [row] = self.spark.sql(
                 "SELECT double_int(add_int(1, 2)), add_int(double_int(2), 1)"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to