This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e921a74ba2b3 [SPARK-53414][PYTHON][TESTS] Add tests for Arrow UDF with 
profiler
e921a74ba2b3 is described below

commit e921a74ba2b301eb97b550db5c3e4b6e8ee15b7d
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Aug 28 15:00:09 2025 +0800

    [SPARK-53414][PYTHON][TESTS] Add tests for Arrow UDF with profiler
    
    ### What changes were proposed in this pull request?
    Add tests for Arrow UDF with profiler
    
    ### Why are the changes needed?
    for test coverage, to make sure arrow udf works with profiler
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52158 from zhengruifeng/arrow_udf_profiler.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/tests/test_udf_profiler.py | 87 +++++++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 750cc59a93d9..de35532285df 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -297,6 +297,29 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
 
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+    def test_perf_profiler_arrow_udf(self):
+        import pyarrow as pa
+
+        @arrow_udf("long")
+        def add1(x):
+            return pa.compute.add(x, 1)
+
+        @arrow_udf("long")
+        def add2(x):
+            return pa.compute.add(x, 2)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.range(10, numPartitions=2).select(
+                add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
+            )
+            df.collect()
+
+        self.assertEqual(3, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -324,6 +347,30 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
 
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+    def test_perf_profiler_arrow_udf_iterator_not_supported(self):
+        import pyarrow as pa
+
+        @arrow_udf("long")
+        def add1(x):
+            return pa.compute.add(x, 1)
+
+        @arrow_udf("long")
+        def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
+            for s in iter:
+                yield pa.compute.add(s, 2)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.range(10, numPartitions=2).select(
+                add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
+            )
+            df.collect()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -365,6 +412,27 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=5)
 
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+    def test_perf_profiler_arrow_udf_window(self):
+        import pyarrow as pa
+
+        @arrow_udf("double")
+        def mean_udf(v: pa.Array) -> float:
+            return pa.compute.mean(v)
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+        w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=5)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -388,6 +456,25 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
 
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+    def test_perf_profiler_arrow_udf_agg(self):
+        import pyarrow as pa
+
+        @arrow_udf("double")
+        def min_udf(v: pa.Array) -> float:
+            return pa.compute.min(v)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.createDataFrame(
+                [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", 
"name"]
+            )
+            df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to