This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 1ac345283ce7 [SPARK-55326][PYTHON][CONNECT][FOLLOWUP] Skip cleanup 
RPCs in _on_exit when client is already closed
1ac345283ce7 is described below

commit 1ac345283ce7c288cb5ffa3b9dc38add80565ea7
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Jun 1 14:04:02 2026 +0800

    [SPARK-55326][PYTHON][CONNECT][FOLLOWUP] Skip cleanup RPCs in _on_exit when 
client is already closed
    
    ### What changes were proposed in this pull request?
    
    Follow-up to #54106. `SparkConnectClient._on_exit` is registered with 
`atexit` and runs during process shutdown. Today it unconditionally calls 
`_cleanup_ml_cache()` (and, if `SPARK_CONNECT_RELEASE_SESSION_ON_EXIT` is set, 
`release_session()` / `close()`) even when the user has already explicitly 
closed the client.
    
    This PR short-circuits `_on_exit` when `self._closed` is `True`. The 
server-side resources were already released by the explicit `close()`, so 
reissuing the cleanup RPCs is wasted work and, if the server has since become 
unreachable, can block process exit on the gRPC call.
    
    The `and not self._closed` guard on the `_release_session_on_exit` branch 
is now redundant and removed.
    
    ### Why are the changes needed?
    
    After an explicit `close()`, the `atexit` handler should be a no-op. The 
previous behavior issued a fresh `ExecutePlan` RPC (via `_cleanup_ml_cache`) at 
interpreter shutdown, which serves no purpose and can hang the process if the 
server is no longer reachable.
    
    See https://github.com/apache/spark/pull/54106#issuecomment-4079259041 for 
context.
    
    Note: an earlier revision of this PR also tried to bound the cleanup RPCs 
with a gRPC per-call deadline for the case where the user never called 
`close()` and the server is unreachable. That required `_cleanup_ml_cache` to 
bypass `execute_command` and call the stub directly, which gaogaotiantian 
flagged as a larger change that needs more discussion. The unreachable-server 
case for non-closed clients is left for a separate PR; this PR is scoped to the 
uncontroversial post-`close()` no-op.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Behavior change is limited to making `_on_exit` a no-op after explicit 
`close()`.
    
    ### How was this patch tested?
    
    Extended the existing `test_on_exit_does_not_call_when_already_closed` in 
`test_client.py` to also assert that `_cleanup_ml_cache` is not invoked when 
the client is already closed.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
    
    Closes #56140 from cloud-fan/release-on-exit-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 75ea3db736bcafe1de67007d521e3bc0b346a531)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 python/pyspark/sql/connect/client/core.py              | 9 ++++++++-
 python/pyspark/sql/tests/connect/client/test_client.py | 7 ++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 925b114d070d..b2e30a8325db 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -2283,8 +2283,15 @@ class SparkConnectClient(object):
             return []
 
     def _on_exit(self) -> None:
+        # If the client has already been explicitly closed, skip all cleanup 
RPCs.
+        # The server-side resources were released by close(); reissuing them 
here
+        # is wasted work and, if the server has since become unreachable, can
+        # block process exit on the gRPC call.
+        if self._closed:
+            return
+
         self._cleanup_ml_cache()
-        if self._release_session_on_exit and not self._closed:
+        if self._release_session_on_exit:
             try:
                 self.release_session()
             except Exception:
diff --git a/python/pyspark/sql/tests/connect/client/test_client.py 
b/python/pyspark/sql/tests/connect/client/test_client.py
index 85fbafe22728..9b0f59522e25 100644
--- a/python/pyspark/sql/tests/connect/client/test_client.py
+++ b/python/pyspark/sql/tests/connect/client/test_client.py
@@ -499,7 +499,10 @@ class SparkConnectClientTestCase(unittest.TestCase):
         client._release_session_on_exit = True
         client._closed = True
 
-        call_tracker = {"release_session": 0, "close": 0}
+        call_tracker = {"cleanup_ml_cache": 0, "release_session": 0, "close": 
0}
+
+        def mock_cleanup_ml_cache():
+            call_tracker["cleanup_ml_cache"] += 1
 
         def mock_release_session():
             call_tracker["release_session"] += 1
@@ -507,11 +510,13 @@ class SparkConnectClientTestCase(unittest.TestCase):
         def mock_close():
             call_tracker["close"] += 1
 
+        client._cleanup_ml_cache = mock_cleanup_ml_cache
         client.release_session = mock_release_session
         client.close = mock_close
 
         client._on_exit()
 
+        self.assertEqual(call_tracker["cleanup_ml_cache"], 0)
         self.assertEqual(call_tracker["release_session"], 0)
         self.assertEqual(call_tracker["close"], 0)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to