commit:     d92d69a52b2b127a0934656163f7075015ef7c85
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 17 19:05:07 2023 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 17 19:14:54 2023 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=d92d69a5

MergeProcess: Use multiprocessing.Pipe to decouple fd_pipes

Use multiprocessing.Pipe to decouple from the fd_pipes implementation
since that currently only works for the multiprocessing "fork" start
method.

Bug: https://bugs.gentoo.org/915903
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/dbapi/_MergeProcess.py | 25 ++++++++++---------------
 lib/portage/dbapi/vartree.py       |  2 +-
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/lib/portage/dbapi/_MergeProcess.py 
b/lib/portage/dbapi/_MergeProcess.py
index c1085270da..cf8e6513d1 100644
--- a/lib/portage/dbapi/_MergeProcess.py
+++ b/lib/portage/dbapi/_MergeProcess.py
@@ -96,7 +96,7 @@ class MergeProcess(ForkProcess):
             self._locked_vdb = False
 
     def _elog_output_handler(self):
-        output = self._read_buf(self._elog_reader_fd)
+        output = self._read_buf(self._elog_reader_fd.fileno())
         if output:
             lines = _unicode_decode(output).split("\n")
             if len(lines) == 1:
@@ -112,8 +112,8 @@ class MergeProcess(ForkProcess):
                     reporter(msg, phase=phase, key=key, out=out)
 
         elif output is not None:  # EIO/POLLHUP
-            self.scheduler.remove_reader(self._elog_reader_fd)
-            os.close(self._elog_reader_fd)
+            self.scheduler.remove_reader(self._elog_reader_fd.fileno())
+            self._elog_reader_fd.close()
             self._elog_reader_fd = None
             return False
 
@@ -136,16 +136,15 @@ class MergeProcess(ForkProcess):
         post-fork actions.
         """
 
-        elog_reader_fd, elog_writer_fd = os.pipe()
+        elog_reader_fd, elog_writer_fd = multiprocessing.Pipe(duplex=False)
 
         fcntl.fcntl(
-            elog_reader_fd,
+            elog_reader_fd.fileno(),
             fcntl.F_SETFL,
-            fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK,
+            fcntl.fcntl(elog_reader_fd.fileno(), fcntl.F_GETFL) | 
os.O_NONBLOCK,
         )
 
         mtime_reader, mtime_writer = multiprocessing.Pipe(duplex=False)
-        fd_pipes[mtime_writer.fileno()] = mtime_writer.fileno()
         self.scheduler.add_reader(mtime_reader.fileno(), self._mtime_handler)
         self._mtime_reader = mtime_reader
 
@@ -166,8 +165,7 @@ class MergeProcess(ForkProcess):
             pipe=elog_writer_fd,
             mtime_pipe=mtime_writer,
         )
-        fd_pipes[elog_writer_fd] = elog_writer_fd
-        self.scheduler.add_reader(elog_reader_fd, self._elog_output_handler)
+        self.scheduler.add_reader(elog_reader_fd.fileno(), 
self._elog_output_handler)
 
         # If a concurrent emerge process tries to install a package
         # in the same SLOT as this one at the same time, there is an
@@ -185,7 +183,6 @@ class MergeProcess(ForkProcess):
         self.target = functools.partial(
             self._target,
             self._counter,
-            self._elog_reader_fd,
             self._dblink,
             self.infloc,
             self.mydbapi,
@@ -198,7 +195,7 @@ class MergeProcess(ForkProcess):
         )
 
         pids = super()._spawn(args, fd_pipes, **kwargs)
-        os.close(elog_writer_fd)
+        elog_writer_fd.close()
         mtime_writer.close()
         self._buf = ""
         self._elog_keys = set()
@@ -217,7 +214,6 @@ class MergeProcess(ForkProcess):
     @staticmethod
     def _target(
         counter,
-        elog_reader_fd,
         mylink,
         infloc,
         mydbapi,
@@ -231,7 +227,6 @@ class MergeProcess(ForkProcess):
         """
         TODO: Make all arguments picklable for the multiprocessing spawn start 
method.
         """
-        os.close(elog_reader_fd)
         portage.output.havecolor = not no_color(settings)
         # Avoid wastful updates of the vdb cache.
         vardb._flush_cache_enabled = False
@@ -301,8 +296,8 @@ class MergeProcess(ForkProcess):
 
         self._unlock_vdb()
         if self._elog_reader_fd is not None:
-            self.scheduler.remove_reader(self._elog_reader_fd)
-            os.close(self._elog_reader_fd)
+            self.scheduler.remove_reader(self._elog_reader_fd.fileno())
+            self._elog_reader_fd.close()
             self._elog_reader_fd = None
         if self._elog_keys is not None:
             for key in self._elog_keys:

diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py
index 835cbb8092..88fc525771 100644
--- a/lib/portage/dbapi/vartree.py
+++ b/lib/portage/dbapi/vartree.py
@@ -4198,7 +4198,7 @@ class dblink:
             if str_buffer:
                 str_buffer = _unicode_encode("".join(str_buffer))
                 while str_buffer:
-                    str_buffer = str_buffer[os.write(self._pipe, str_buffer) :]
+                    str_buffer = str_buffer[os.write(self._pipe.fileno(), 
str_buffer) :]
 
     def _emerge_log(self, msg):
         emergelog(False, msg)

Reply via email to