This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 115d3a956287 [SPARK-51506][PYTHON][SS] Do not enforce users to implement close() in TransformWithStateInPandas 115d3a956287 is described below commit 115d3a95628773978177a1c39e006c7921596c90 Author: bogao007 <bo....@databricks.com> AuthorDate: Fri Mar 14 09:31:43 2025 +0900 [SPARK-51506][PYTHON][SS] Do not enforce users to implement close() in TransformWithStateInPandas ### What changes were proposed in this pull request? Do not enforce users to implement `close()` in TransformWithStateInPandas since `close()` is an optional function to implement. ### Why are the changes needed? Optional function should not enforce users to implement. This also aligns with Scala TWS. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Updated the existing unit test by not implement `close()` function in several stateful processors. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50272 from bogao007/SPARK-51506. Authored-by: bogao007 <bo....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit a8e92e4b1b0671db825e188f2078aedf182bba49) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- python/pyspark/sql/streaming/stateful_processor.py | 1 - python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 6 ------ 2 files changed, 7 deletions(-) diff --git a/python/pyspark/sql/streaming/stateful_processor.py b/python/pyspark/sql/streaming/stateful_processor.py index d836ec4e2e82..54684234bc36 100644 --- a/python/pyspark/sql/streaming/stateful_processor.py +++ b/python/pyspark/sql/streaming/stateful_processor.py @@ -426,7 +426,6 @@ class StatefulProcessor(ABC): """ return iter([]) - @abstractmethod def close(self) -> None: """ Function called as the last method that allows for users to perform any cleanup or teardown diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index a9a0bbb31d49..26af261be104 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -1775,9 +1775,6 @@ class TTLStatefulProcessor(StatefulProcessor): } ) - def close(self) -> None: - pass - class InvalidSimpleStatefulProcessor(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: @@ -1793,9 +1790,6 @@ class InvalidSimpleStatefulProcessor(StatefulProcessor): self.num_violations_state.clear() yield pd.DataFrame({"id": key, "countAsString": str(count)}) - def close(self) -> None: - pass - class ListStateProcessor(StatefulProcessor): # Dict to store the expected results. The key represents the grouping key string, and the value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org