This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e921a74ba2b3 [SPARK-53414][PYTHON][TESTS] Add tests for Arrow UDF with
profiler
e921a74ba2b3 is described below
commit e921a74ba2b301eb97b550db5c3e4b6e8ee15b7d
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Aug 28 15:00:09 2025 +0800
[SPARK-53414][PYTHON][TESTS] Add tests for Arrow UDF with profiler
### What changes were proposed in this pull request?
Add tests for Arrow UDF with profiler
### Why are the changes needed?
for test coverage, to make sure arrow udf works with profiler
### Does this PR introduce _any_ user-facing change?
no, test-only
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52158 from zhengruifeng/arrow_udf_profiler.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/tests/test_udf_profiler.py | 87 +++++++++++++++++++++++++++
1 file changed, 87 insertions(+)
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index 750cc59a93d9..de35532285df 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -297,6 +297,29 @@ class UDFProfiler2TestsMixin:
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+ def test_perf_profiler_arrow_udf(self):
+ import pyarrow as pa
+
+ @arrow_udf("long")
+ def add1(x):
+ return pa.compute.add(x, 1)
+
+ @arrow_udf("long")
+ def add2(x):
+ return pa.compute.add(x, 2)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df = self.spark.range(10, numPartitions=2).select(
+ add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
+ )
+ df.collect()
+
+ self.assertEqual(3, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -324,6 +347,30 @@ class UDFProfiler2TestsMixin:
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+ def test_perf_profiler_arrow_udf_iterator_not_supported(self):
+ import pyarrow as pa
+
+ @arrow_udf("long")
+ def add1(x):
+ return pa.compute.add(x, 1)
+
+ @arrow_udf("long")
+ def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
+ for s in iter:
+ yield pa.compute.add(s, 2)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df = self.spark.range(10, numPartitions=2).select(
+ add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
+ )
+ df.collect()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -365,6 +412,27 @@ class UDFProfiler2TestsMixin:
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=5)
+ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+ def test_perf_profiler_arrow_udf_window(self):
+ import pyarrow as pa
+
+ @arrow_udf("double")
+ def mean_udf(v: pa.Array) -> float:
+ return pa.compute.mean(v)
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+ w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=5)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
@@ -388,6 +456,25 @@ class UDFProfiler2TestsMixin:
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+ def test_perf_profiler_arrow_udf_agg(self):
+ import pyarrow as pa
+
+ @arrow_udf("double")
+ def min_udf(v: pa.Array) -> float:
+ return pa.compute.min(v)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df = self.spark.createDataFrame(
+ [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age",
"name"]
+ )
+ df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]