This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee7c257b9fb [SPARK-48634][PYTHON][CONNECT] Avoid statically 
initialize threadpool at ExecutePlanResponseReattachableIterator
6ee7c257b9fb is described below

commit 6ee7c257b9fb47400cb447dca7c6cd37364476f3
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jun 19 09:13:00 2024 +0900

    [SPARK-48634][PYTHON][CONNECT] Avoid statically initialize threadpool at 
ExecutePlanResponseReattachableIterator
    
    ### What changes were proposed in this pull request?
    
    This PR propose to avoid having 
`ExecutePlanResponseReattachableIterator._release_thread_pool` to initialize 
`ThreadPool`.
    
    ### Why are the changes needed?
    
    This instance might be dragged in during pickle because it's statically 
initialized.
    
    ```
        _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() 
if os.cpu_count() else 8)
      File "/usr/lib/python3.10/multiprocessing/pool.py", line 930, in __init__
        Pool.__init__(self, processes, initializer, initargs)
      File "/usr/lib/python3.10/multiprocessing/pool.py", line 196, in __init__
        self._change_notifier = self._ctx.SimpleQueue()
      File "/usr/lib/python3.10/multiprocessing/context.py", line 113, in 
SimpleQueue
        return SimpleQueue(ctx=self.get_context())
      File "/usr/lib/python3.10/multiprocessing/queues.py", line 341, in 
__init__
        self._rlock = ctx.Lock()
      File "/usr/lib/python3.10/multiprocessing/context.py", line 68, in Lock
        return Lock(ctx=self.get_context())
      File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 162, in 
__init__
        SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
      File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 57, in 
__init__
        sl = self._semlock = _multiprocessing.SemLock(
    PermissionError: [Errno 13] Permission denied
    ```
    
    which requires to change in OS level.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yeah, potentially this could trigger some random job failures in some 
environment like Ubuntu
    
    ### How was this patch tested?
    
    Manually tested.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46993 from HyukjinKwon/make-thread.
    
    Lead-authored-by: Hyukjin Kwon <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client/core.py     |  2 +-
 python/pyspark/sql/connect/client/reattach.py | 47 +++++++++++++++++----------
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index f3bbab69f271..efc76bb99f56 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1108,7 +1108,7 @@ class SparkConnectClient(object):
         """
         Close the channel.
         """
-        ExecutePlanResponseReattachableIterator.shutdown()
+        ExecutePlanResponseReattachableIterator.shutdown_threadpool()
         self._channel.close()
         self._closed = True
 
diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index cc50e5892631..c20d2b6e2e83 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -58,28 +58,41 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
     # Lock to manage the pool
     _lock: ClassVar[RLock] = RLock()
-    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)
+    _release_thread_pool_instance: Optional[ThreadPool] = None
+
+    @classmethod  # type: ignore[misc]
+    @property
+    def _release_thread_pool(cls) -> ThreadPool:
+        # Perform a first check outside the critical path.
+        if cls._release_thread_pool_instance is not None:
+            return cls._release_thread_pool_instance
+        with cls._lock:
+            if cls._release_thread_pool_instance is None:
+                cls._release_thread_pool_instance = ThreadPool(
+                    os.cpu_count() if os.cpu_count() else 8
+                )
+            return cls._release_thread_pool_instance
 
     @classmethod
-    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+    def shutdown_threadpool(cls: 
Type["ExecutePlanResponseReattachableIterator"]) -> None:
         """
         When the channel is closed, this method will be called before, to make 
sure all
         outstanding calls are closed.
         """
         with cls._lock:
-            if cls._release_thread_pool is not None:
-                cls._release_thread_pool.close()
-                cls._release_thread_pool.join()
-                cls._release_thread_pool = None
+            if cls._release_thread_pool_instance is not None:
+                cls._release_thread_pool.close()  # type: ignore[attr-defined]
+                cls._release_thread_pool.join()  # type: ignore[attr-defined]
+                cls._release_thread_pool_instance = None
 
-    @classmethod
-    def _initialize_pool_if_necessary(cls: 
Type["ExecutePlanResponseReattachableIterator"]) -> None:
+    def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
         """
-        If the processing pool for the release calls is None, initialize the 
pool exactly once.
+        When the channel is closed, this method will be called before, to make 
sure all
+        outstanding calls are closed, and mark this iterator is shutdown.
         """
-        with cls._lock:
-            if cls._release_thread_pool is None:
-                cls._release_thread_pool = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)
+        with self._lock:
+            self.shutdown_threadpool()
+            self._is_shutdown = True
 
     def __init__(
         self,
@@ -88,7 +101,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
         retrying: Callable[[], Retrying],
         metadata: Iterable[Tuple[str, str]],
     ):
-        ExecutePlanResponseReattachableIterator._initialize_pool_if_necessary()
+        self._is_shutdown = False
         self._request = request
         self._retrying = retrying
         if request.operation_id:
@@ -206,8 +219,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
             except Exception as e:
                 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
-        if ExecutePlanResponseReattachableIterator._release_thread_pool is not 
None:
-            
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+        if not self._is_shutdown:
+            self._release_thread_pool.apply_async(target)
 
     def _release_all(self) -> None:
         """
@@ -230,8 +243,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
             except Exception as e:
                 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
-        if ExecutePlanResponseReattachableIterator._release_thread_pool is not 
None:
-            
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+        if not self._is_shutdown:
+            self._release_thread_pool.apply_async(target)
         self._result_complete = True
 
     def _call_iter(self, iter_fun: Callable) -> Any:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to