commit:     74bd5179abb2d6c22757ded884093f55cd01150e
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Oct 30 04:26:34 2025 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct 31 23:08:12 2025 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=74bd5179

ForkProcess: _send_fd_pipes thread safety

Since the _send_fd_pipes method runs in an executor thread,
use a lock to safely access file descriptors which may have
been closed by the main thread.

Bug: https://bugs.gentoo.org/965311
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/util/_async/ForkProcess.py | 88 +++++++++++++++++-----------------
 1 file changed, 45 insertions(+), 43 deletions(-)

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 09b4fc9e07..c8c89c4b40 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2024 Gentoo Authors
+# Copyright 2012-2025 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import fcntl
@@ -6,6 +6,7 @@ import multiprocessing
 import warnings
 import signal
 import sys
+import threading
 
 from typing import Optional
 
@@ -28,6 +29,8 @@ class ForkProcess(SpawnProcess):
         "_child_connection",
         # Duplicate file descriptors for use by _send_fd_pipes background 
thread.
         "_fd_pipes",
+        "_send_fd_lock",
+        "_send_fd_files",
     )
 
     _file_names = ("connection", "slave_fd")
@@ -81,6 +84,7 @@ class ForkProcess(SpawnProcess):
                     stdout_fd = os.dup(self.fd_pipes[1])
 
             if self._HAVE_SEND_HANDLE:
+                self._send_fd_lock = threading.Lock()
                 if self.create_pipe is not False:
                     master_fd, slave_fd = self._pipe(self.fd_pipes)
                     self.fd_pipes[1] = slave_fd
@@ -97,7 +101,9 @@ class ForkProcess(SpawnProcess):
                     master_fd = None
                     slave_fd = None
 
-                self._files = self._files_dict(connection=connection, 
slave_fd=slave_fd)
+                self._send_fd_files = self._files_dict(
+                    connection=connection, slave_fd=slave_fd
+                )
 
                 # Create duplicate file descriptors in self._fd_pipes
                 # so that the caller is free to manage the lifecycle
@@ -113,10 +119,6 @@ class ForkProcess(SpawnProcess):
                             src_new, not bool(old_fdflags & fcntl.FD_CLOEXEC)
                         )
                     self._fd_pipes[dest] = fd_map[src]
-
-                asyncio.ensure_future(
-                    self._proc.wait(), self.scheduler
-                ).add_done_callback(self._close_fd_pipes)
             else:
                 master_fd = connection
 
@@ -124,44 +126,54 @@ class ForkProcess(SpawnProcess):
                 master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
             )
 
-    def _close_fd_pipes(self, future):
+    def _close_send_fd_pipes(self):
         """
-        Cleanup self._fd_pipes if needed, since _send_fd_pipes could
+        Cleanup self._fd_pipes and self._send_fd_files if needed, since _main 
could
         have been cancelled.
         """
-        # future.result() raises asyncio.CancelledError if
-        # future.cancelled(), but that should not happen.
-        future.result()
-        if self._fd_pipes is not None:
-            for fd in set(self._fd_pipes.values()):
-                os.close(fd)
-            self._fd_pipes = None
+        if self._send_fd_lock is None:
+            return
+
+        with self._send_fd_lock:
+            if self._fd_pipes is not None:
+                for fd in set(self._fd_pipes.values()):
+                    os.close(fd)
+                self._fd_pipes = None
+
+            if self._send_fd_files:
+                if hasattr(self._send_fd_files, "connection"):
+                    self._send_fd_files.connection.close()
+                    del self._send_fd_files.connection
+                if hasattr(self._send_fd_files, "slave_fd"):
+                    if self._send_fd_files.slave_fd is not None:
+                        os.close(self._send_fd_files.slave_fd)
+                    del self._send_fd_files.slave_fd
 
     @property
     def _fd_pipes_send_handle(self):
         """Returns True if we have a connection to implement fd_pipes via 
send_handle."""
-        return bool(
-            self._HAVE_SEND_HANDLE
-            and self._files
-            and getattr(self._files, "connection", False)
-        )
+        return self._send_fd_lock is not None
 
     def _send_fd_pipes(self):
         """
         Communicate with _bootstrap to send fd_pipes via send_handle.
         This performs blocking IO, intended for invocation via run_in_executor.
         """
-        fd_list = list(set(self._fd_pipes.values()))
         try:
-            self._files.connection.send(
-                (self._fd_pipes, fd_list),
-            )
-            for fd in fd_list:
-                multiprocessing.reduction.send_handle(
-                    self._files.connection,
-                    fd,
-                    self.pid,
+            with self._send_fd_lock:
+                if self._fd_pipes is None:
+                    raise asyncio.CancelledError
+
+                fd_list = list(set(self._fd_pipes.values()))
+                self._send_fd_files.connection.send(
+                    (self._fd_pipes, fd_list),
                 )
+                for fd in fd_list:
+                    multiprocessing.reduction.send_handle(
+                        self._send_fd_files.connection,
+                        fd,
+                        self.pid,
+                    )
         except (BrokenPipeError, ConnectionResetError) as e:
             # This case is triggered by testAsynchronousLockWaitCancel
             # when the test case terminates the child process while
@@ -171,11 +183,6 @@ class ForkProcess(SpawnProcess):
             # child error should have gone to stderr.
             raise asyncio.CancelledError from e
 
-        # self._fd_pipes contains duplicates that must be closed.
-        for fd in fd_list:
-            os.close(fd)
-        self._fd_pipes = None
-
     async def _main(self, build_logger, pipe_logger, loop=None):
         try:
             if self._fd_pipes_send_handle:
@@ -183,18 +190,12 @@ class ForkProcess(SpawnProcess):
                     None,
                     self._send_fd_pipes,
                 )
+
+                # self._fd_pipes contains duplicates that must be closed.
+                self._close_send_fd_pipes()
         except asyncio.CancelledError:
             self._main_cancel(build_logger, pipe_logger)
             raise
-        finally:
-            if self._files:
-                if hasattr(self._files, "connection"):
-                    self._files.connection.close()
-                    del self._files.connection
-                if hasattr(self._files, "slave_fd"):
-                    if self._files.slave_fd is not None:
-                        os.close(self._files.slave_fd)
-                    del self._files.slave_fd
 
         await super()._main(build_logger, pipe_logger, loop=loop)
 
@@ -272,6 +273,7 @@ class ForkProcess(SpawnProcess):
         super()._unregister()
         if self._proc is not None:
             self._proc.terminate()
+        self._close_send_fd_pipes()
 
     @staticmethod
     def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, 
kwargs):

Reply via email to