This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 1ac345283ce7 [SPARK-55326][PYTHON][CONNECT][FOLLOWUP] Skip cleanup
RPCs in _on_exit when client is already closed
1ac345283ce7 is described below
commit 1ac345283ce7c288cb5ffa3b9dc38add80565ea7
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Jun 1 14:04:02 2026 +0800
[SPARK-55326][PYTHON][CONNECT][FOLLOWUP] Skip cleanup RPCs in _on_exit when
client is already closed
### What changes were proposed in this pull request?
Follow-up to #54106. `SparkConnectClient._on_exit` is registered with
`atexit` and runs during process shutdown. Today it unconditionally calls
`_cleanup_ml_cache()` (and, if `SPARK_CONNECT_RELEASE_SESSION_ON_EXIT` is set,
`release_session()` / `close()`) even when the user has already explicitly
closed the client.
This PR short-circuits `_on_exit` when `self._closed` is `True`. The
server-side resources were already released by the explicit `close()`, so
reissuing the cleanup RPCs is wasted work and, if the server has since become
unreachable, can block process exit on the gRPC call.
The `and not self._closed` guard on the `_release_session_on_exit` branch
is now redundant and removed.
### Why are the changes needed?
After an explicit `close()`, the `atexit` handler should be a no-op. The
previous behavior issued a fresh `ExecutePlan` RPC (via `_cleanup_ml_cache`) at
interpreter shutdown, which serves no purpose and can hang the process if the
server is no longer reachable.
See https://github.com/apache/spark/pull/54106#issuecomment-4079259041 for
context.
Note: an earlier revision of this PR also tried to bound the cleanup RPCs
with a gRPC per-call deadline for the case where the user never called
`close()` and the server is unreachable. That required `_cleanup_ml_cache` to
bypass `execute_command` and call the stub directly, which gaogaotiantian
flagged as a larger change that needs more discussion. The unreachable-server
case for non-closed clients is left for a separate PR; this PR is scoped to the
uncontroversial post-`close()` no-op.
### Does this PR introduce _any_ user-facing change?
No. Behavior change is limited to making `_on_exit` a no-op after explicit
`close()`.
### How was this patch tested?
Extended the existing `test_on_exit_does_not_call_when_already_closed` in
`test_client.py` to also assert that `_cleanup_ml_cache` is not invoked when
the client is already closed.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
Closes #56140 from cloud-fan/release-on-exit-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 75ea3db736bcafe1de67007d521e3bc0b346a531)
Signed-off-by: Wenchen Fan <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 9 ++++++++-
python/pyspark/sql/tests/connect/client/test_client.py | 7 ++++++-
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 925b114d070d..b2e30a8325db 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -2283,8 +2283,15 @@ class SparkConnectClient(object):
return []
def _on_exit(self) -> None:
+ # If the client has already been explicitly closed, skip all cleanup
RPCs.
+ # The server-side resources were released by close(); reissuing them
here
+ # is wasted work and, if the server has since become unreachable, can
+ # block process exit on the gRPC call.
+ if self._closed:
+ return
+
self._cleanup_ml_cache()
- if self._release_session_on_exit and not self._closed:
+ if self._release_session_on_exit:
try:
self.release_session()
except Exception:
diff --git a/python/pyspark/sql/tests/connect/client/test_client.py
b/python/pyspark/sql/tests/connect/client/test_client.py
index 85fbafe22728..9b0f59522e25 100644
--- a/python/pyspark/sql/tests/connect/client/test_client.py
+++ b/python/pyspark/sql/tests/connect/client/test_client.py
@@ -499,7 +499,10 @@ class SparkConnectClientTestCase(unittest.TestCase):
client._release_session_on_exit = True
client._closed = True
- call_tracker = {"release_session": 0, "close": 0}
+ call_tracker = {"cleanup_ml_cache": 0, "release_session": 0, "close":
0}
+
+ def mock_cleanup_ml_cache():
+ call_tracker["cleanup_ml_cache"] += 1
def mock_release_session():
call_tracker["release_session"] += 1
@@ -507,11 +510,13 @@ class SparkConnectClientTestCase(unittest.TestCase):
def mock_close():
call_tracker["close"] += 1
+ client._cleanup_ml_cache = mock_cleanup_ml_cache
client.release_session = mock_release_session
client.close = mock_close
client._on_exit()
+ self.assertEqual(call_tracker["cleanup_ml_cache"], 0)
self.assertEqual(call_tracker["release_session"], 0)
self.assertEqual(call_tracker["close"], 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]