commit: e43f6c583ed9205abbdcb11340c81d7dd97ccc11
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Feb 25 23:19:58 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Feb 28 17:22:20 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=e43f6c58
Add iter_completed convenience function (bug 648790)
The iter_completed function is similar to asyncio.as_completed, but
takes an iterator of futures as input, and includes support for
max_jobs and max_load parameters. The default values for max_jobs
and max_load correspond to multiprocessing.cpu_count().
Example usage for async_aux_get:
import portage
from portage.util.futures.iter_completed import iter_completed
portdb = portage.portdb
# aux_get has many inputs, and the same cpv can exist in multiple
# repositories, so the caller is responsibe for mapping futures
# back to their aux_get inputs
future_cpv = {}
def future_generator():
for cpv in portdb.cp_list('sys-apps/portage'):
future = portdb.async_aux_get(cpv, portage.auxdbkeys)
future_cpv[id(future)] = cpv
yield future
for future in iter_completed(future_generator()):
cpv = future_cpv.pop(id(future))
try:
result = future.result()
except KeyError as e:
# aux_get failed
print('error:', cpv, e)
else:
print(cpv, result)
See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
Bug: https://bugs.gentoo.org/648790
.../tests/util/futures/test_iter_completed.py | 50 ++++++++++
pym/portage/util/_async/AsyncTaskFuture.py | 31 +++++++
pym/portage/util/futures/iter_completed.py | 63 +++++++++++++
pym/portage/util/futures/wait.py | 102 +++++++++++++++++++++
4 files changed, 246 insertions(+)
diff --git a/pym/portage/tests/util/futures/test_iter_completed.py
b/pym/portage/tests/util/futures/test_iter_completed.py
new file mode 100644
index 000000000..d0a7dbb45
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -0,0 +1,50 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import time
+from portage.tests import TestCase
+from portage.util._async.ForkProcess import ForkProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.iter_completed import iter_completed
+
+
+class SleepProcess(ForkProcess):
+ __slots__ = ('future', 'seconds')
+ def _start(self):
+ self.addExitListener(self._future_done)
+ ForkProcess._start(self)
+
+ def _future_done(self, task):
+ self.future.set_result(self.seconds)
+
+ def _run(self):
+ time.sleep(self.seconds)
+
+
+class IterCompletedTestCase(TestCase):
+
+ def testIterCompleted(self):
+
+ # Mark this as todo, since we don't want to fail if heavy system
+ # load causes the tasks to finish in an unexpected order.
+ self.todo = True
+
+ loop = global_event_loop()
+ tasks = [
+ SleepProcess(seconds=0.200),
+ SleepProcess(seconds=0.100),
+ SleepProcess(seconds=0.001),
+ ]
+
+ expected_order = sorted(task.seconds for task in tasks)
+
+ def future_generator():
+ for task in tasks:
+ task.future = loop.create_future()
+ task.scheduler = loop
+ task.start()
+ yield task.future
+
+ for seconds, future in zip(expected_order,
iter_completed(future_generator(),
+ max_jobs=True, max_load=None, loop=loop)):
+ self.assertEqual(seconds, future.result())
diff --git a/pym/portage/util/_async/AsyncTaskFuture.py
b/pym/portage/util/_async/AsyncTaskFuture.py
new file mode 100644
index 000000000..ee39183fe
--- /dev/null
+++ b/pym/portage/util/_async/AsyncTaskFuture.py
@@ -0,0 +1,31 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+import signal
+
+from _emerge.AsynchronousTask import AsynchronousTask
+
+
+class AsyncTaskFuture(AsynchronousTask):
+ """
+ Wraps a Future in an AsynchronousTask, which is useful for
+ scheduling with TaskScheduler.
+ """
+ __slots__ = ('future', 'scheduler')
+ def _start(self):
+ self.future.add_done_callback(self._done_callback)
+
+ def _cancel(self):
+ if not self.future.done():
+ self.future.cancel()
+
+ def _done_callback(self, future):
+ if future.cancelled():
+ self.cancelled = True
+ self.returncode = -signal.SIGINT
+ elif future.exception() is None:
+ self.returncode = os.EX_OK
+ else:
+ self.returncode = 1
+ self.wait()
diff --git a/pym/portage/util/futures/iter_completed.py
b/pym/portage/util/futures/iter_completed.py
new file mode 100644
index 000000000..1050b6fa7
--- /dev/null
+++ b/pym/portage/util/futures/iter_completed.py
@@ -0,0 +1,63 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.wait import wait, FIRST_COMPLETED
+
+
+def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ This is similar to asyncio.as_completed, but takes an iterator of
+ futures as input, and includes support for max_jobs and max_load
+ parameters.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: iterator of futures that are done
+ @rtype: iterator
+ """
+ loop = loop or global_event_loop()
+ max_jobs = max_jobs or multiprocessing.cpu_count()
+ max_load = max_load or multiprocessing.cpu_count()
+
+ future_map = {}
+ def task_generator():
+ for future in futures:
+ future_map[id(future)] = future
+ yield AsyncTaskFuture(future=future)
+
+ scheduler = TaskScheduler(
+ task_generator(),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ event_loop=loop)
+
+ try:
+ scheduler.start()
+
+ # scheduler should ensure that future_map is non-empty until
+ # task_generator is exhausted
+ while future_map:
+ done, pending = loop.run_until_complete(
+ wait(*list(future_map.values()),
return_when=FIRST_COMPLETED))
+ for future in done:
+ del future_map[id(future)]
+ yield future
+
+ finally:
+ # cleanup in case of interruption by SIGINT, etc
+ scheduler.cancel()
+ scheduler.wait()
diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py
new file mode 100644
index 000000000..3f0bdbff5
--- /dev/null
+++ b/pym/portage/util/futures/wait.py
@@ -0,0 +1,102 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+ from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
+except ImportError:
+ ALL_COMPLETED = 'ALL_COMPLETED'
+ FIRST_COMPLETED ='FIRST_COMPLETED'
+ FIRST_EXCEPTION = 'FIRST_EXCEPTION'
+
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+
+# Use **kwargs since python2.7 does not allow arguments with defaults
+# to follow *futures.
+def wait(*futures, **kwargs):
+ """
+ Use portage's internal EventLoop to emulate asyncio.wait:
+ https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
+
+ @param future: future to wait for
+ @type future: asyncio.Future (or compatible)
+ @param timeout: number of seconds to wait (wait indefinitely if
+ not specified)
+ @type timeout: int or float
+ @param return_when: indicates when this function should return, must
+ be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
+ FIRST_EXCEPTION (default is ALL_COMPLETED)
+ @type return_when: object
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: tuple of (done, pending).
+ @rtype: asyncio.Future (or compatible)
+ """
+ if not futures:
+ raise TypeError("wait() missing 1 required positional argument:
'future'")
+ loop = kwargs.pop('loop', None)
+ timeout = kwargs.pop('timeout', None)
+ return_when = kwargs.pop('return_when', ALL_COMPLETED)
+ if kwargs:
+ raise TypeError("wait() got an unexpected keyword argument
'{}'".\
+ format(next(iter(kwargs))))
+ loop = loop or global_event_loop()
+ result_future = loop.create_future()
+ _Waiter(futures, timeout, return_when, result_future, loop)
+ return result_future
+
+
+class _Waiter(object):
+ def __init__(self, futures, timeout, return_when, result_future, loop):
+ self._futures = futures
+ self._completed = set()
+ self._exceptions = set()
+ self._return_when = return_when
+ self._result_future = result_future
+ self._loop = loop
+ self._ready = False
+ self._timeout = None
+ result_future.add_done_callback(self._cancel_callback)
+ for future in self._futures:
+ future.add_done_callback(self._done_callback)
+ if timeout is not None:
+ self._timeout = loop.call_later(timeout,
self._timeout_callback)
+
+ def _cancel_callback(self, future):
+ if future.cancelled():
+ self._ready_callback()
+
+ def _timeout_callback(self):
+ if not self._ready:
+ self._ready = True
+ self._ready_callback()
+
+ def _done_callback(self, future):
+ if future.cancelled() or future.exception() is None:
+ self._completed.add(id(future))
+ else:
+ self._exceptions.add(id(future))
+ if not self._ready and (
+ (self._return_when is FIRST_COMPLETED and
self._completed) or
+ (self._return_when is FIRST_EXCEPTION and
self._exceptions) or
+ (len(self._futures) == len(self._completed) +
len(self._exceptions))):
+ self._ready = True
+ # use call_soon in case multiple callbacks complete in
quick succession
+ self._loop.call_soon(self._ready_callback)
+
+ def _ready_callback(self):
+ if self._timeout is not None:
+ self._timeout.cancel()
+ self._timeout = None
+ if self._result_future.cancelled():
+ return
+ done = []
+ pending = []
+ done_ids = self._completed.union(self._exceptions)
+ for future in self._futures:
+ if id(future) in done_ids:
+ done.append(future)
+ else:
+ pending.append(future)
+ future.remove_done_callback(self._done_callback)
+ self._result_future.set_result((done, pending))