This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 115d3a956287 [SPARK-51506][PYTHON][SS] Do not enforce users to 
implement close() in TransformWithStateInPandas
115d3a956287 is described below

commit 115d3a95628773978177a1c39e006c7921596c90
Author: bogao007 <bo....@databricks.com>
AuthorDate: Fri Mar 14 09:31:43 2025 +0900

    [SPARK-51506][PYTHON][SS] Do not enforce users to implement close() in 
TransformWithStateInPandas
    
    ### What changes were proposed in this pull request?
    
    Do not enforce users to implement `close()` in TransformWithStateInPandas 
since `close()` is an optional function to implement.
    
    ### Why are the changes needed?
    
    Optional function should not enforce users to implement. This also aligns 
with Scala TWS.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Updated the existing unit test by not implement `close()` function in 
several stateful processors.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50272 from bogao007/SPARK-51506.
    
    Authored-by: bogao007 <bo....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit a8e92e4b1b0671db825e188f2078aedf182bba49)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/streaming/stateful_processor.py                  | 1 -
 python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 6 ------
 2 files changed, 7 deletions(-)

diff --git a/python/pyspark/sql/streaming/stateful_processor.py 
b/python/pyspark/sql/streaming/stateful_processor.py
index d836ec4e2e82..54684234bc36 100644
--- a/python/pyspark/sql/streaming/stateful_processor.py
+++ b/python/pyspark/sql/streaming/stateful_processor.py
@@ -426,7 +426,6 @@ class StatefulProcessor(ABC):
         """
         return iter([])
 
-    @abstractmethod
     def close(self) -> None:
         """
         Function called as the last method that allows for users to perform 
any cleanup or teardown
diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index a9a0bbb31d49..26af261be104 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -1775,9 +1775,6 @@ class TTLStatefulProcessor(StatefulProcessor):
             }
         )
 
-    def close(self) -> None:
-        pass
-
 
 class InvalidSimpleStatefulProcessor(StatefulProcessor):
     def init(self, handle: StatefulProcessorHandle) -> None:
@@ -1793,9 +1790,6 @@ class InvalidSimpleStatefulProcessor(StatefulProcessor):
         self.num_violations_state.clear()
         yield pd.DataFrame({"id": key, "countAsString": str(count)})
 
-    def close(self) -> None:
-        pass
-
 
 class ListStateProcessor(StatefulProcessor):
     # Dict to store the expected results. The key represents the grouping key 
string, and the value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to