commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Thu Apr 12 03:56:25 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Apr 13 07:10:10 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df
Implement _PortageEventLoop.subprocess_exec (bug 649588) In python versions that support asyncio, this allows API consumers to use the asyncio.create_subprocess_exec() function with portage's internal event loop. Currently, subprocess.PIPE is not implemented because that would require an implementation of asyncio's private asyncio.unix_events._UnixReadPipeTransport class. However, it's possible to use pipes created with os.pipe() for stdin, stdout, and stderr, as demonstrated in the included unit tests. Bug: https://bugs.gentoo.org/649588 .../util/futures/asyncio/test_subprocess_exec.py | 163 +++++++++++++++++++++ pym/portage/util/futures/unix_events.py | 98 +++++++++++++ 2 files changed, 261 insertions(+) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py new file mode 100644 index 000000000..d30f48c43 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -0,0 +1,163 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os + +from portage.process import find_binary +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.executor.fork import ForkExecutor +from portage.util.futures.unix_events import DefaultEventLoopPolicy +from _emerge.PipeReader import PipeReader + + +def reader(input_file, loop=None): + """ + Asynchronously read a binary input file. + + @param input_file: binary input file + @type input_file: file + @param loop: event loop + @type loop: EventLoop + @return: bytes + @rtype: asyncio.Future (or compatible) + """ + loop = loop or asyncio.get_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + future = loop.create_future() + _Reader(future, input_file, loop) + return future + + +class _Reader(object): + def __init__(self, future, input_file, loop): + self._future = future + self._pipe_reader = PipeReader( + input_files={'input_file':input_file}, scheduler=loop._loop) + + self._future.add_done_callback(self._cancel_callback) + self._pipe_reader.addExitListener(self._eof) + self._pipe_reader.start() + + def _cancel_callback(self, future): + if future.cancelled(): + self._cancel() + + def _eof(self, pipe_reader): + self._pipe_reader = None + self._future.set_result(pipe_reader.getvalue()) + + def _cancel(self): + if self._pipe_reader is not None and self._pipe_reader.poll() is None: + self._pipe_reader.removeExitListener(self._eof) + self._pipe_reader.cancel() + self._pipe_reader = None + + +class SubprocessExecTestCase(TestCase): + def _run_test(self, test): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + test(asyncio.get_event_loop()) + finally: + asyncio.set_event_loop_policy(initial_policy) + + def testEcho(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = os.fdopen(stdout_pr, 'rb', 0) + stdout_pw = os.fdopen(stdout_pw, 'wb', 0) + files = [stdout_pr, stdout_pw] + + def test(loop): + output = None + try: + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) + + # This belongs exclusively to the subprocess now. + stdout_pw.close() + + output = asyncio.ensure_future( + reader(stdout_pr, loop=loop), loop=loop) + + self.assertEqual( + loop.run_until_complete(proc.wait()), os.EX_OK) + self.assertEqual( + tuple(loop.run_until_complete(output).split()), args_tuple) + finally: + if output is not None and not output.done(): + output.cancel() + for f in files: + f.close() + + self._run_test(test) + + def testCat(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + stdin_data = b'hello world' + cat_binary = find_binary("cat") + self.assertNotEqual(cat_binary, None) + cat_binary = cat_binary.encode() + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = os.fdopen(stdout_pr, 'rb', 0) + stdout_pw = os.fdopen(stdout_pw, 'wb', 0) + + stdin_pr, stdin_pw = os.pipe() + stdin_pr = os.fdopen(stdin_pr, 'rb', 0) + stdin_pw = os.fdopen(stdin_pw, 'wb', 0) + + files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw] + + def test(loop): + output = None + try: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + cat_binary, + stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) + + # These belong exclusively to the subprocess now. + stdout_pw.close() + stdin_pr.close() + + output = asyncio.ensure_future( + reader(stdout_pr, loop=loop), loop=loop) + + with ForkExecutor(loop=loop) as executor: + writer = asyncio.ensure_future(loop.run_in_executor( + executor, stdin_pw.write, stdin_data), loop=loop) + + # This belongs exclusively to the writer now. + stdin_pw.close() + loop.run_until_complete(writer) + + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + self.assertEqual(loop.run_until_complete(output), stdin_data) + finally: + if output is not None and not output.done(): + output.cancel() + for f in files: + f.close() + + self._run_test(test) diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 5434cd942..1abc420e1 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -7,11 +7,15 @@ __all__ = ( ) try: + from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher except ImportError: _AbstractChildWatcher = object + _BaseSubprocessTransport = object +import functools import os +import subprocess from portage.util._eventloop.global_event_loop import ( global_event_loop as _global_event_loop, @@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return asyncio.Task(coro, loop=self) + def subprocess_exec(self, protocol_factory, program, *args, **kwargs): + """ + Run subprocesses asynchronously using the subprocess module. + + @type protocol_factory: callable + @param protocol_factory: must instantiate a subclass of the + asyncio.SubprocessProtocol class + @type program: str or bytes + @param program: the program to execute + @type args: str or bytes + @param args: program's arguments + @type kwargs: varies + @param kwargs: subprocess.Popen parameters + @rtype: asyncio.Future + @return: Returns a pair of (transport, protocol), where transport + is an instance of BaseSubprocessTransport + """ + + # python2.7 does not allow arguments with defaults after *args + stdin = kwargs.pop('stdin', subprocess.PIPE) + stdout = kwargs.pop('stdout', subprocess.PIPE) + stderr = kwargs.pop('stderr', subprocess.PIPE) + + if subprocess.PIPE in (stdin, stdout, stderr): + # Requires connect_read/write_pipe implementation, for example + # see asyncio.unix_events._UnixReadPipeTransport. + raise NotImplementedError() + + universal_newlines = kwargs.pop('universal_newlines', False) + shell = kwargs.pop('shell', False) + bufsize = kwargs.pop('bufsize', 0) + + if universal_newlines: + raise ValueError("universal_newlines must be False") + if shell: + raise ValueError("shell must be False") + if bufsize != 0: + raise ValueError("bufsize must be 0") + popen_args = (program,) + args + for arg in popen_args: + if not isinstance(arg, (str, bytes)): + raise TypeError("program arguments must be " + "a bytes or text string, not %s" + % type(arg).__name__) + result = self.create_future() + self._make_subprocess_transport( + result, protocol_factory(), popen_args, False, stdin, stdout, stderr, + bufsize, **kwargs) + return result + + def _make_subprocess_transport(self, result, protocol, args, shell, + stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = self.create_future() + transp = _UnixSubprocessTransport(self, + protocol, args, shell, stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, + **kwargs) + + self._loop._asyncio_child_watcher.add_child_handler( + transp.get_pid(), self._child_watcher_callback, transp) + + waiter.add_done_callback(functools.partial( + self._subprocess_transport_callback, transp, protocol, result)) + + def _subprocess_transport_callback(self, transp, protocol, result, waiter): + if waiter.exception() is None: + result.set_result((transp, protocol)) + else: + transp.close() + wait_transp = asyncio.ensure_future(transp._wait(), loop=self) + wait_transp.add_done_callback( + functools.partial(self._subprocess_transport_failure, + result, waiter.exception())) + + def _child_watcher_callback(self, pid, returncode, transp): + self.call_soon_threadsafe(transp._process_exited, returncode) + + def _subprocess_transport_failure(self, result, exception, wait_transp): + result.set_exception(wait_transp.exception() or exception) + + +class _UnixSubprocessTransport(_BaseSubprocessTransport): + """ + This is identical to the standard library's private + asyncio.unix_events._UnixSubprocessTransport class, except that + subprocess.PIPE is not implemented for stdin, since that would + require connect_write_pipe support in the event loop. For example, + see the asyncio.unix_events._UnixWritePipeTransport class. + """ + def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + class AbstractChildWatcher(_AbstractChildWatcher): def add_child_handler(self, pid, callback, *args):
