[issue43119] asyncio.Queue.put never yields if the queue is unbounded

2021-02-03 Thread Spencer Nelson


New submission from Spencer Nelson :

I am writing some software that reads records from a very large file (~hundreds 
of GB), putting them in an `asyncio.Queue` as it goes, and a chain of consumers 
handle each record and do stuff over the network.

To my surprise, my program runs out of memory because the Queue producer 
coroutine never yields control. I think (but am not sure) that the 
asyncio.Queue.put method has no preemption point if the queue is not full; I 
was using an unbounded Queue, so it was _never_ full, so my coroutine was never 
unscheduled.

I have attached a file with a minimal reproducer. It creates an unbounded 
queue. A 'publish' task calls `queue.put` from an infinite sequence. A 
'subscribe' task calls `queue.get`, and prints each item. Nothing gets printed 
if I run this, because the `queue.put` never blocks.

I would expect that `await queue.put` would occasionally cede execution to any 
other runnable coroutines, even if the queue is unbounded.

--
components: asyncio
files: never_yields.py
messages: 386454
nosy: asvetlov, spenczar, yselivanov
priority: normal
severity: normal
status: open
title: asyncio.Queue.put never yields if the queue is unbounded
type: behavior
versions: Python 3.8
Added file: https://bugs.python.org/file49789/never_yields.py

___
Python tracker 
<https://bugs.python.org/issue43119>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue43119] asyncio.Queue.put never yields if the queue is unbounded

2021-02-03 Thread Spencer Nelson


Change by Spencer Nelson :


--
keywords: +patch
pull_requests: +23243
stage:  -> patch review
pull_request: https://github.com/python/cpython/pull/24433

___
Python tracker 
<https://bugs.python.org/issue43119>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue43119] asyncio.Queue.put never yields if the queue is unbounded

2021-02-04 Thread Spencer Nelson


Spencer Nelson  added the comment:

Thanks for testing on more Python versions.

Yes, adding asyncio.sleep(0) after each put is an effective workaround - it's 
certainly possible to manually yield like that. I just think that that's what 
asyncio.Queue.put ought to be doing - in fact, that's my proposed change in the 
associated PR: 
https://github.com/python/cpython/pull/24433/files#diff-22a6bcb03e783378149a3e9411c185b13c908e61886ffd55145634b7ab12caacR141

I suppose that, more broadly, I would have expected that _any_ `await f()` call 
would be a preemptible point, but I think that's a much, much larger discussion.

--

___
Python tracker 
<https://bugs.python.org/issue43119>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue43119] asyncio.Queue.put never yields if the queue is unbounded

2021-02-05 Thread Spencer Nelson


Spencer Nelson  added the comment:

Josh,

> Making literally every await equivalent to:
> 
> await asyncio.sleep(0)
> 
> followed by the actual await (which is effectively what you're proposing when 
> you expect all await to be preemptible) means adding non-trivial overhead to 
> all async operations (asyncio is based on system calls of the 
> select/poll/epoll/kpoll variety, which add meaningful overhead when we're 
> talking about an operation that is otherwise equivalent to an extremely cheap 
> simple collections.deque.append call).

A few things:

First, I don't think I proposed that. I was simply saying that my expectations 
on behavior were incorrect, which points towards documentation.

Second, I don't think making a point "preemptible" is the same as actually 
executing a cooperative-style yield to the scheduler. I just expected that it 
would always be in the cards - that it would always be a potential point where 
I'd get scheduled away.

Third, I don't think that await asyncio.sleep(0) triggers a syscall, but I 
certainly could be mistaken. It looks to me like it is special-cased in 
asyncio, from my reading of the source. Again - could be wrong.

Fourth, I think that the idea of non-cooperative preempting scheduling is not 
nearly as bizarre as you make it sound. There's certainly plenty of prior art 
on preemptive schedulers out there. Go uses a sort of partial preemption at 
function call sites *because* it's a particularly efficient way to do things.

But anyway - I didn't really want to discuss this. As I said above, it's 
obviously a way way way bigger design discussion than my specific issue.


> It also breaks many reasonable uses of asyncio.wait and asyncio.as_completed, 
> where the caller can reasonably expect to be able to await the known-complete 
> tasks without being preempted (if you know the coroutine is actually done, it 
> could be quite surprising/problematic when you await it and get preempted, 
> potentially requiring synchronization that wouldn't be necessary otherwise).

I think this cuts both ways. Without reading the source code of asyncio.Queue, 
I don't see how it's possible to know whether its put method yields. Because of 
this, I tend to assume synchronization is necessary everywhere. The way I know 
for sure that a function call can complete without yielding is supposed to be 
that it isn't an `async` function, right? That's why asyncio.Queue.put_nowait 
exists and isn't asynchronous.

> In real life, if whatever you're feeding the queue with is infinite and 
> requires no awaiting to produce each value, you should probably just avoid 
> the queue and have the consumer consume the iterable directly.

The stuff I'm feeding the queue doesn't require awaiting, but I *wish* it did. 
It's just a case of not having the libraries for asynchronicity yet on the 
source side. I was hoping that the queue would let me pace my work in a way 
that would let me do more concurrent work.

> Or just apply a maximum size to the queue; since the source of data to put is 
> infinite and not-awaitable, there's no benefit to an unbounded queue, you may 
> as well use a bound roughly fitted to the number of consumers, because any 
> further items are just wasting memory well ahead of when it's needed.

The problem isn't really that put doesn't yield for unbounded queues - it's 
that put doesn't yield *unless the queue is full*. That means that, if I use a 
very high maximum size for the queue, I'll still spend a big chunk of time 
filling up the queue, and only then will consumers start doing work.

I could pick a small queue bound, but then I'm more likely to waste time doing 
nothing if consumers are slower than the producer - I'll sit there with a 
full-but-tiny queue. Work-units in the queue can take wildly different amounts 
of time, so consumers will often be briefly slow, so the producer races ahead - 
until it hits its tiny limit. But then new work units arrive, and so the 
consumers are fast again - and they're quickly starved for work because the 
producer didn't build a good backlog.

So, the problem still remains, if work takes an uncertain amount of time which 
would seem to be the common reason for using a queue in the first place.

> Point is, regular queue puts only block (and potentially release the GIL 
> early) when they're full or, as a necessary consequence of threading being 
> less predictable than asyncio, when there is contention on the lock 
> protecting the queue internals (which is usually resolved quickly); why would 
> asyncio queues go out of their way to block when they don't need to?

I think you have it backwards. asyncio.Queue.put *always* blocks other 
coroutines