Re: Question about asyncio and blocking operations

2016-01-29 Thread Maxime Steisel
Le 28 janv. 2016 22:52, "Ian Kelly"  a écrit :
>
> On Thu, Jan 28, 2016 at 2:23 PM, Maxime S  wrote:
> >
> > 2016-01-28 17:53 GMT+01:00 Ian Kelly :
> >>
> >> On Thu, Jan 28, 2016 at 9:40 AM, Frank Millman 
wrote:
> >>
> >> > The caller requests some data from the database like this.
> >> >
> >> >return_queue = asyncio.Queue()
> >> >sql = 'SELECT ...'
> >> >request_queue.put((return_queue, sql))
> >>
> >> Note that since this is a queue.Queue, the put call has the potential
> >> to block your entire event loop.
> >>
> >
> > Actually, I don't think you actually need an asyncio.Queue.
> >
> > You could use a simple deque as a buffer, and call fetchmany() when it
is
> > empty, like that (untested):
>
> True. The asyncio Queue is really just a wrapper around a deque with
> an interface designed for use with the producer-consumer pattern. If
> the producer isn't a coroutine then it may not be appropriate.
>
> This seems like a nice suggestion. Caution is advised if multiple
> cursor methods are executed concurrently since they would be in
> different threads and the underlying cursor may not be thread-safe.
> --
> https://mail.python.org/mailman/listinfo/python-list

Indeed, the run_in_executor call should probably protected by an
asyncio.Lock.

But it is a pretty strange idea to call two fetch*() method concurrently
anyways.
-- 
https://mail.python.org/mailman/listinfo/python-list


Re: Anything better than asyncio.as_completed() and asyncio.wait() to manage execution of large amount of tasks?

2014-07-16 Thread Maxime Steisel
2014-07-15 14:20 GMT+02:00 Valery Khamenya :
> Hi,
>
> both asyncio.as_completed() and asyncio.wait() work with lists only. No
> generators are accepted. Are there anything similar to those functions that
> pulls Tasks/Futures/coroutines one-by-one and processes them in a limited
> task pool?


Something like this (adapted from as_completed) should do the work:

import asyncio
from concurrent import futures

def parallelize(tasks, *, loop=None, max_workers=5, timeout=None):
loop = loop if loop is not None else asyncio.get_event_loop()
workers = []
pending = set()
done = asyncio.Queue(maxsize=max_workers)
exhausted = False

@asyncio.coroutine
def _worker():
nonlocal exhausted
while not exhausted:
try:
t = next(tasks)
pending.add(t)
yield from t
yield from done.put(t)
pending.remove(t)
except StopIteration:
exhausted = True

def _on_timeout():
for f in workers:
f.cancel()
workers.clear()
#Wake up _wait_for_one()
done.put_nowait(None)

@asyncio.coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
raise futures.TimeoutError()
return f.result()

workers = [asyncio.async(_worker()) for i in range(max_workers)]

if workers and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)

while not exhausted or pending or not done.empty():
yield _wait_for_one()

timeout_handle.cancel()
-- 
https://mail.python.org/mailman/listinfo/python-list


Re: asyncio with map&reduce flavor and without flooding the event loop

2014-08-06 Thread Maxime Steisel
2014-08-03 16:01 GMT+02:00 Valery Khamenya :
> Hi all
>
> [snip]
>
> Consider a task like crawling the web starting from some web-sites. Each
> site leads to generation of new downloading tasks in exponential(!)
> progression. However we don't want neither to flood the event loop nor to
> overload our network. We'd like to control the task flow. This is what I
> achieve well with modification of nice Maxime's solution proposed here:
> https://mail.python.org/pipermail/python-list/2014-July/675048.html
>
> Well, but I'd need as well a very natural thing, kind of map() & reduce() or
> functools.reduce() if we are on python3 already. That is, I'd need to call a
> "summarizing" function for all the downloading tasks completed on links from
> a page. This is where i fail :(

Hi Valery,

With the modified as_completed, you can write map and reduce
primitives quite naturally.

It could look like that:



def async_map(corofunc, *iterables):
"""
Equivalent to map(corofunc, *iterables) except that
corofunc must be a coroutine function and is executed asynchronously.

This is not a coroutine, just a normal generator yielding Task instances.
"""
for args in zip(*iterables):
yield asyncio.async(corofunc(*args))

@asyncio.coroutine
def async_reduce(corofunc, futures, initial=0):
"""
Equivalent to functools.reduce(corofunc, [f.result() for f in
futures]) except that
corofunc must be a coroutine function and future results can be
evaluated out-of order.

This function is a coroutine.
"""
result = initial
for f in as_completed(futures, max_workers=50):
new_value = (yield from f)
result = (yield from corofunc(result, new_value))
return result

===

Best,

Maxime
-- 
https://mail.python.org/mailman/listinfo/python-list