Re: Question about asyncio and blocking operations
Le 28 janv. 2016 22:52, "Ian Kelly" a écrit : > > On Thu, Jan 28, 2016 at 2:23 PM, Maxime S wrote: > > > > 2016-01-28 17:53 GMT+01:00 Ian Kelly : > >> > >> On Thu, Jan 28, 2016 at 9:40 AM, Frank Millman wrote: > >> > >> > The caller requests some data from the database like this. > >> > > >> >return_queue = asyncio.Queue() > >> >sql = 'SELECT ...' > >> >request_queue.put((return_queue, sql)) > >> > >> Note that since this is a queue.Queue, the put call has the potential > >> to block your entire event loop. > >> > > > > Actually, I don't think you actually need an asyncio.Queue. > > > > You could use a simple deque as a buffer, and call fetchmany() when it is > > empty, like that (untested): > > True. The asyncio Queue is really just a wrapper around a deque with > an interface designed for use with the producer-consumer pattern. If > the producer isn't a coroutine then it may not be appropriate. > > This seems like a nice suggestion. Caution is advised if multiple > cursor methods are executed concurrently since they would be in > different threads and the underlying cursor may not be thread-safe. > -- > https://mail.python.org/mailman/listinfo/python-list Indeed, the run_in_executor call should probably protected by an asyncio.Lock. But it is a pretty strange idea to call two fetch*() method concurrently anyways. -- https://mail.python.org/mailman/listinfo/python-list
Re: Anything better than asyncio.as_completed() and asyncio.wait() to manage execution of large amount of tasks?
2014-07-15 14:20 GMT+02:00 Valery Khamenya : > Hi, > > both asyncio.as_completed() and asyncio.wait() work with lists only. No > generators are accepted. Are there anything similar to those functions that > pulls Tasks/Futures/coroutines one-by-one and processes them in a limited > task pool? Something like this (adapted from as_completed) should do the work: import asyncio from concurrent import futures def parallelize(tasks, *, loop=None, max_workers=5, timeout=None): loop = loop if loop is not None else asyncio.get_event_loop() workers = [] pending = set() done = asyncio.Queue(maxsize=max_workers) exhausted = False @asyncio.coroutine def _worker(): nonlocal exhausted while not exhausted: try: t = next(tasks) pending.add(t) yield from t yield from done.put(t) pending.remove(t) except StopIteration: exhausted = True def _on_timeout(): for f in workers: f.cancel() workers.clear() #Wake up _wait_for_one() done.put_nowait(None) @asyncio.coroutine def _wait_for_one(): f = yield from done.get() if f is None: raise futures.TimeoutError() return f.result() workers = [asyncio.async(_worker()) for i in range(max_workers)] if workers and timeout is not None: timeout_handle = loop.call_later(timeout, _on_timeout) while not exhausted or pending or not done.empty(): yield _wait_for_one() timeout_handle.cancel() -- https://mail.python.org/mailman/listinfo/python-list
Re: asyncio with map&reduce flavor and without flooding the event loop
2014-08-03 16:01 GMT+02:00 Valery Khamenya : > Hi all > > [snip] > > Consider a task like crawling the web starting from some web-sites. Each > site leads to generation of new downloading tasks in exponential(!) > progression. However we don't want neither to flood the event loop nor to > overload our network. We'd like to control the task flow. This is what I > achieve well with modification of nice Maxime's solution proposed here: > https://mail.python.org/pipermail/python-list/2014-July/675048.html > > Well, but I'd need as well a very natural thing, kind of map() & reduce() or > functools.reduce() if we are on python3 already. That is, I'd need to call a > "summarizing" function for all the downloading tasks completed on links from > a page. This is where i fail :( Hi Valery, With the modified as_completed, you can write map and reduce primitives quite naturally. It could look like that: def async_map(corofunc, *iterables): """ Equivalent to map(corofunc, *iterables) except that corofunc must be a coroutine function and is executed asynchronously. This is not a coroutine, just a normal generator yielding Task instances. """ for args in zip(*iterables): yield asyncio.async(corofunc(*args)) @asyncio.coroutine def async_reduce(corofunc, futures, initial=0): """ Equivalent to functools.reduce(corofunc, [f.result() for f in futures]) except that corofunc must be a coroutine function and future results can be evaluated out-of order. This function is a coroutine. """ result = initial for f in as_completed(futures, max_workers=50): new_value = (yield from f) result = (yield from corofunc(result, new_value)) return result === Best, Maxime -- https://mail.python.org/mailman/listinfo/python-list
