commit: d92d69a52b2b127a0934656163f7075015ef7c85 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Oct 17 19:05:07 2023 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Tue Oct 17 19:14:54 2023 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=d92d69a5
MergeProcess: Use multiprocessing.Pipe to decouple fd_pipes Use multiprocessing.Pipe to decouple from the fd_pipes implementation since that currently only works for the multiprocessing "fork" start method. Bug: https://bugs.gentoo.org/915903 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/dbapi/_MergeProcess.py | 25 ++++++++++--------------- lib/portage/dbapi/vartree.py | 2 +- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/lib/portage/dbapi/_MergeProcess.py b/lib/portage/dbapi/_MergeProcess.py index c1085270da..cf8e6513d1 100644 --- a/lib/portage/dbapi/_MergeProcess.py +++ b/lib/portage/dbapi/_MergeProcess.py @@ -96,7 +96,7 @@ class MergeProcess(ForkProcess): self._locked_vdb = False def _elog_output_handler(self): - output = self._read_buf(self._elog_reader_fd) + output = self._read_buf(self._elog_reader_fd.fileno()) if output: lines = _unicode_decode(output).split("\n") if len(lines) == 1: @@ -112,8 +112,8 @@ class MergeProcess(ForkProcess): reporter(msg, phase=phase, key=key, out=out) elif output is not None: # EIO/POLLHUP - self.scheduler.remove_reader(self._elog_reader_fd) - os.close(self._elog_reader_fd) + self.scheduler.remove_reader(self._elog_reader_fd.fileno()) + self._elog_reader_fd.close() self._elog_reader_fd = None return False @@ -136,16 +136,15 @@ class MergeProcess(ForkProcess): post-fork actions. """ - elog_reader_fd, elog_writer_fd = os.pipe() + elog_reader_fd, elog_writer_fd = multiprocessing.Pipe(duplex=False) fcntl.fcntl( - elog_reader_fd, + elog_reader_fd.fileno(), fcntl.F_SETFL, - fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK, + fcntl.fcntl(elog_reader_fd.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK, ) mtime_reader, mtime_writer = multiprocessing.Pipe(duplex=False) - fd_pipes[mtime_writer.fileno()] = mtime_writer.fileno() self.scheduler.add_reader(mtime_reader.fileno(), self._mtime_handler) self._mtime_reader = mtime_reader @@ -166,8 +165,7 @@ class MergeProcess(ForkProcess): pipe=elog_writer_fd, mtime_pipe=mtime_writer, ) - fd_pipes[elog_writer_fd] = elog_writer_fd - self.scheduler.add_reader(elog_reader_fd, self._elog_output_handler) + self.scheduler.add_reader(elog_reader_fd.fileno(), self._elog_output_handler) # If a concurrent emerge process tries to install a package # in the same SLOT as this one at the same time, there is an @@ -185,7 +183,6 @@ class MergeProcess(ForkProcess): self.target = functools.partial( self._target, self._counter, - self._elog_reader_fd, self._dblink, self.infloc, self.mydbapi, @@ -198,7 +195,7 @@ class MergeProcess(ForkProcess): ) pids = super()._spawn(args, fd_pipes, **kwargs) - os.close(elog_writer_fd) + elog_writer_fd.close() mtime_writer.close() self._buf = "" self._elog_keys = set() @@ -217,7 +214,6 @@ class MergeProcess(ForkProcess): @staticmethod def _target( counter, - elog_reader_fd, mylink, infloc, mydbapi, @@ -231,7 +227,6 @@ class MergeProcess(ForkProcess): """ TODO: Make all arguments picklable for the multiprocessing spawn start method. """ - os.close(elog_reader_fd) portage.output.havecolor = not no_color(settings) # Avoid wastful updates of the vdb cache. vardb._flush_cache_enabled = False @@ -301,8 +296,8 @@ class MergeProcess(ForkProcess): self._unlock_vdb() if self._elog_reader_fd is not None: - self.scheduler.remove_reader(self._elog_reader_fd) - os.close(self._elog_reader_fd) + self.scheduler.remove_reader(self._elog_reader_fd.fileno()) + self._elog_reader_fd.close() self._elog_reader_fd = None if self._elog_keys is not None: for key in self._elog_keys: diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py index 835cbb8092..88fc525771 100644 --- a/lib/portage/dbapi/vartree.py +++ b/lib/portage/dbapi/vartree.py @@ -4198,7 +4198,7 @@ class dblink: if str_buffer: str_buffer = _unicode_encode("".join(str_buffer)) while str_buffer: - str_buffer = str_buffer[os.write(self._pipe, str_buffer) :] + str_buffer = str_buffer[os.write(self._pipe.fileno(), str_buffer) :] def _emerge_log(self, msg): emergelog(False, msg)
