This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4914b4b689b1 [SPARK-52424][TESTS] Add unit test to verify in Spark 
Connect, Column object defined outside of a function can be referenced properly
4914b4b689b1 is described below

commit 4914b4b689b1bcc27db4677dd61438edcf283b8e
Author: Haoyan Geng <[email protected]>
AuthorDate: Mon Jun 23 08:55:34 2025 +0900

    [SPARK-52424][TESTS] Add unit test to verify in Spark Connect, Column 
object defined outside of a function can be referenced properly
    
    ### What changes were proposed in this pull request?
    
    Added a unit test to verify the case of a UDF referencing a Column object 
defined outside of it can be referenced from the UDF as intended.
    
    ### Why are the changes needed?
    
    This isn't really an intended usage pattern, but there are users implicitly 
relying on this behavior.  So it'd cause issues for those users if this is 
unintentionally broken.  Here we add a new unit test to ensure this case is 
covered.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    This is a unit test only change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51101 from haoyangeng-db/ext-col.
    
    Authored-by: Haoyan Geng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../tests/streaming/test_streaming_foreach_batch.py | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py 
b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
index 9db66aa252ee..567a26011f5e 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py
@@ -204,6 +204,27 @@ class StreamingTestsForeachBatchMixin:
             df = 
self.spark.read.format("text").load("python/test_support/sql/streaming")
             self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
 
+    def test_streaming_foreach_batch_external_column(self):
+        from pyspark.sql import functions as sf
+
+        table_name = "testTable_foreach_batch_external_column"
+
+        # Define 'col' outside the `func` below, so it'd have to be serialized.
+        col = sf.col("value")
+
+        def func(df: DataFrame, batch_id: int):
+            result_df = df.select(col.alias("result"))
+            result_df.write.mode("append").saveAsTable(table_name)
+
+        df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+        q = df.writeStream.foreachBatch(func).start()
+        q.processAllAvailable()
+        q.stop()
+
+        collected = self.spark.sql("select * from " + table_name).collect()
+        results = [row["result"] for row in collected]
+        self.assertEqual(sorted(results), ["hello", "this"])
+
 
 class StreamingTestsForeachBatch(StreamingTestsForeachBatchMixin, 
ReusedSQLTestCase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to