This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2cc107560e7 [MINOR][CONNECT][PYTHON] Fix a race condition in 
ExecutePlanResponseReattachableIterator.shutdown
d2cc107560e7 is described below

commit d2cc107560e71c4116a115ea85f6ca18baab8849
Author: Alex Khakhlyuk <[email protected]>
AuthorDate: Fri Jul 4 08:16:10 2025 +0900

    [MINOR][CONNECT][PYTHON] Fix a race condition in 
ExecutePlanResponseReattachableIterator.shutdown
    
    ### What changes were proposed in this pull request?
    
    The lock that keeps the ThreadPoolExecutor is reentrant, so the same thread 
can access it multiple times. At the same time del finalizer can be called from 
the same thread while the thread pool is being shutdown. In this PR we first 
set the thread pool instance to none and then call shutdown.
    
    ### Why are the changes needed?
    
    We have seen race conditions in multi-threading scenarios:
    ```
    File 
"/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py",
 line 352, in __del_.
    return self.close ()
    File 
"/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py"
 , line 348, in close
    self._release_all()
    File 
"/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py"
 ', line 258, in _release_all
    self._release_thread_pool.submit(target)
    File "/usr/local/lib/python3. 12/concurrent/futures/thread.py™, line 171, 
in submit
    raise RuntimeError(' cannot schedule new futures after shutdown' )
    RuntimeError: cannot schedule new futures after shutdown
    ```
    This PR fixes the race condition.
    
    The problem was introduced in spark 4.0.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51369 from khakhlyuk/fix-race-condition.
    
    Authored-by: Alex Khakhlyuk <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client/reattach.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 78b783371ab5..06a4fe17c29f 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -79,8 +79,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
         """
         with cls._lock:
             if cls._release_thread_pool_instance is not None:
-                cls._get_or_create_release_thread_pool().shutdown()
+                thread_pool = cls._release_thread_pool_instance
                 cls._release_thread_pool_instance = None
+                thread_pool.shutdown()
 
     def __init__(
         self,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to