commit: 74bd5179abb2d6c22757ded884093f55cd01150e Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Thu Oct 30 04:26:34 2025 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Oct 31 23:08:12 2025 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=74bd5179
ForkProcess: _send_fd_pipes thread safety Since the _send_fd_pipes method runs in an executor thread, use a lock to safely access file descriptors which may have been closed by the main thread. Bug: https://bugs.gentoo.org/965311 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/util/_async/ForkProcess.py | 88 +++++++++++++++++----------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index 09b4fc9e07..c8c89c4b40 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -1,4 +1,4 @@ -# Copyright 2012-2024 Gentoo Authors +# Copyright 2012-2025 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import fcntl @@ -6,6 +6,7 @@ import multiprocessing import warnings import signal import sys +import threading from typing import Optional @@ -28,6 +29,8 @@ class ForkProcess(SpawnProcess): "_child_connection", # Duplicate file descriptors for use by _send_fd_pipes background thread. "_fd_pipes", + "_send_fd_lock", + "_send_fd_files", ) _file_names = ("connection", "slave_fd") @@ -81,6 +84,7 @@ class ForkProcess(SpawnProcess): stdout_fd = os.dup(self.fd_pipes[1]) if self._HAVE_SEND_HANDLE: + self._send_fd_lock = threading.Lock() if self.create_pipe is not False: master_fd, slave_fd = self._pipe(self.fd_pipes) self.fd_pipes[1] = slave_fd @@ -97,7 +101,9 @@ class ForkProcess(SpawnProcess): master_fd = None slave_fd = None - self._files = self._files_dict(connection=connection, slave_fd=slave_fd) + self._send_fd_files = self._files_dict( + connection=connection, slave_fd=slave_fd + ) # Create duplicate file descriptors in self._fd_pipes # so that the caller is free to manage the lifecycle @@ -113,10 +119,6 @@ class ForkProcess(SpawnProcess): src_new, not bool(old_fdflags & fcntl.FD_CLOEXEC) ) self._fd_pipes[dest] = fd_map[src] - - asyncio.ensure_future( - self._proc.wait(), self.scheduler - ).add_done_callback(self._close_fd_pipes) else: master_fd = connection @@ -124,44 +126,54 @@ class ForkProcess(SpawnProcess): master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd ) - def _close_fd_pipes(self, future): + def _close_send_fd_pipes(self): """ - Cleanup self._fd_pipes if needed, since _send_fd_pipes could + Cleanup self._fd_pipes and self._send_fd_files if needed, since _main could have been cancelled. """ - # future.result() raises asyncio.CancelledError if - # future.cancelled(), but that should not happen. - future.result() - if self._fd_pipes is not None: - for fd in set(self._fd_pipes.values()): - os.close(fd) - self._fd_pipes = None + if self._send_fd_lock is None: + return + + with self._send_fd_lock: + if self._fd_pipes is not None: + for fd in set(self._fd_pipes.values()): + os.close(fd) + self._fd_pipes = None + + if self._send_fd_files: + if hasattr(self._send_fd_files, "connection"): + self._send_fd_files.connection.close() + del self._send_fd_files.connection + if hasattr(self._send_fd_files, "slave_fd"): + if self._send_fd_files.slave_fd is not None: + os.close(self._send_fd_files.slave_fd) + del self._send_fd_files.slave_fd @property def _fd_pipes_send_handle(self): """Returns True if we have a connection to implement fd_pipes via send_handle.""" - return bool( - self._HAVE_SEND_HANDLE - and self._files - and getattr(self._files, "connection", False) - ) + return self._send_fd_lock is not None def _send_fd_pipes(self): """ Communicate with _bootstrap to send fd_pipes via send_handle. This performs blocking IO, intended for invocation via run_in_executor. """ - fd_list = list(set(self._fd_pipes.values())) try: - self._files.connection.send( - (self._fd_pipes, fd_list), - ) - for fd in fd_list: - multiprocessing.reduction.send_handle( - self._files.connection, - fd, - self.pid, + with self._send_fd_lock: + if self._fd_pipes is None: + raise asyncio.CancelledError + + fd_list = list(set(self._fd_pipes.values())) + self._send_fd_files.connection.send( + (self._fd_pipes, fd_list), ) + for fd in fd_list: + multiprocessing.reduction.send_handle( + self._send_fd_files.connection, + fd, + self.pid, + ) except (BrokenPipeError, ConnectionResetError) as e: # This case is triggered by testAsynchronousLockWaitCancel # when the test case terminates the child process while @@ -171,11 +183,6 @@ class ForkProcess(SpawnProcess): # child error should have gone to stderr. raise asyncio.CancelledError from e - # self._fd_pipes contains duplicates that must be closed. - for fd in fd_list: - os.close(fd) - self._fd_pipes = None - async def _main(self, build_logger, pipe_logger, loop=None): try: if self._fd_pipes_send_handle: @@ -183,18 +190,12 @@ class ForkProcess(SpawnProcess): None, self._send_fd_pipes, ) + + # self._fd_pipes contains duplicates that must be closed. + self._close_send_fd_pipes() except asyncio.CancelledError: self._main_cancel(build_logger, pipe_logger) raise - finally: - if self._files: - if hasattr(self._files, "connection"): - self._files.connection.close() - del self._files.connection - if hasattr(self._files, "slave_fd"): - if self._files.slave_fd is not None: - os.close(self._files.slave_fd) - del self._files.slave_fd await super()._main(build_logger, pipe_logger, loop=loop) @@ -272,6 +273,7 @@ class ForkProcess(SpawnProcess): super()._unregister() if self._proc is not None: self._proc.terminate() + self._close_send_fd_pipes() @staticmethod def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, kwargs):
