> On Sep 15, 2021, at 6:04 AM, Stefan Hajnoczi <stefa...@redhat.com> wrote: > > On Wed, Sep 15, 2021 at 12:21:10AM +0000, John Johnson wrote: >> >> >>> On Sep 14, 2021, at 6:06 AM, Stefan Hajnoczi <stefa...@redhat.com> wrote: >>> >>> On Mon, Sep 13, 2021 at 05:23:33PM +0000, John Johnson wrote: >>>>>> On Sep 9, 2021, at 10:25 PM, John Johnson <john.g.john...@oracle.com> >>>>>> wrote: >>>>>>> On Sep 8, 2021, at 11:29 PM, Stefan Hajnoczi <stefa...@redhat.com> >>>>>>> wrote: >>>>>>> On Thu, Sep 09, 2021 at 05:11:49AM +0000, John Johnson wrote: >>>>>>>> I did look at coroutines, but they seemed to work when the >>>>>>>> sender >>>>>>>> is triggering the coroutine on send, not when request packets are >>>>>>>> arriving >>>>>>>> asynchronously to the sends. >>>>>>> >>>>>>> This can be done with a receiver coroutine. Its job is to be the only >>>>>>> thing that reads vfio-user messages from the socket. A receiver >>>>>>> coroutine reads messages from the socket and wakes up the waiting >>>>>>> coroutine that yielded from vfio_user_send_recv() or >>>>>>> vfio_user_pci_process_req(). >>>>>>> >>>>>>> (Although vfio_user_pci_process_req() could be called directly from the >>>>>>> receiver coroutine, it seems safer to have a separate coroutine that >>>>>>> processes requests so that the receiver isn't blocked in case >>>>>>> vfio_user_pci_process_req() yields while processing a request.) >>>>>>> >>>>>>> Going back to what you mentioned above, the receiver coroutine does >>>>>>> something like this: >>>>>>> >>>>>>> if it's a reply >>>>>>> reply = find_reply(...) >>>>>>> qemu_coroutine_enter(reply->co) // instead of signalling reply->cv >>>>>>> else >>>>>>> QSIMPLEQ_INSERT_TAIL(&pending_reqs, request, next); >>>>>>> if (pending_reqs_was_empty) { >>>>>>> qemu_coroutine_enter(process_request_co); >>>>>>> } >>>>>>> >>>>>>> The pending_reqs queue holds incoming requests that the >>>>>>> process_request_co coroutine processes. >>>>>>> >>>>>> >>>>>> >>>>>> How do coroutines work across threads? There can be multiple vCPU >>>>>> threads waiting for replies, and I think the receiver coroutine will be >>>>>> running in the main loop thread. Where would a vCPU block waiting for >>>>>> a reply? I think coroutine_yield() returns to its coroutine_enter() >>>>>> caller >>>>> >>>>> >>>>> >>>>> A vCPU thread holding the BQL can iterate the event loop if it has >>>>> reached a synchronous point that needs to wait for a reply before >>>>> returning. I think we have this situation when a MemoryRegion is >>>>> accessed on the proxy device. >>>>> >>>>> For example, block/block-backend.c:blk_prw() kicks off a coroutine and >>>>> then runs the event loop until the coroutine finishes: >>>>> >>>>> Coroutine *co = qemu_coroutine_create(co_entry, &rwco); >>>>> bdrv_coroutine_enter(blk_bs(blk), co); >>>>> BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); >>>>> >>>>> BDRV_POLL_WHILE() boils down to a loop like this: >>>>> >>>>> while ((cond)) { >>>>> aio_poll(ctx, true); >>>>> } >>>>> >>>> >>>> I think that would make vCPUs sending requests and the >>>> receiver coroutine all poll on the same socket. If the “wrong” >>>> routine reads the message, I’d need a second level of synchronization >>>> to pass the message to the “right” one. e.g., if the vCPU coroutine >>>> reads a request, it needs to pass it to the receiver; if the receiver >>>> coroutine reads a reply, it needs to pass it to a vCPU. >>>> >>>> Avoiding this complexity is one of the reasons I went with >>>> a separate thread that only reads the socket over the mp-qemu model, >>>> which does have the sender poll, but doesn’t need to handle incoming >>>> requests. >>> >>> Only one coroutine reads from the socket, the "receiver" coroutine. In a >>> previous reply I sketched what the receiver does: >>> >>> if it's a reply >>> reply = find_reply(...) >>> qemu_coroutine_enter(reply->co) // instead of signalling reply->cv >>> else >>> QSIMPLEQ_INSERT_TAIL(&pending_reqs, request, next); >>> if (pending_reqs_was_empty) { >>> qemu_coroutine_enter(process_request_co); >>> } >>> >> >> Sorry, I was assuming when you said the coroutine will block with >> aio_poll(), you implied it would also read messages from the socket. > > The vCPU thread calls aio_poll() outside the coroutine, similar to the > block/block-backend.c:blk_prw() example I posted above: > > Coroutine *co = qemu_coroutine_create(co_entry, &rwco); > bdrv_coroutine_enter(blk_bs(blk), co); > BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); > > (BDRV_POLL_WHILE() is a aio_poll() loop.) > > The coroutine isn't aware of aio_poll(), it just yields when it needs to > wait. > >>> The qemu_coroutine_enter(reply->co) call re-enters the coroutine that >>> was created by the vCPU thread. Is this the "second level of >>> synchronization" that you described? It's very similar to signalling >>> reply->cv in the existing patch. >>> >> >> Yes, the only difference is it would be woken on each message, >> even though it doesn’t read them. Which is what I think you’re addressing >> below. >> >>> Now I'm actually thinking about whether this can be improved by keeping >>> the condvar so that the vCPU thread doesn't need to call aio_poll() >>> (which is awkward because it doesn't drop the BQL and therefore blocks >>> other vCPUs from making progress). That approach wouldn't require a >>> dedicated thread for vfio-user. >>> >> >> Wouldn’t you need to acquire BQL twice for every vCPU reply: once to >> run the receiver coroutine, and once when the vCPU thread wakes up and wants >> to return to the VFIO code. The migration thread would also add a BQL >> dependency, where it didn’t have one before. > > If aio_poll() is used then the vCPU thread doesn't drop the BQL at all. > The vCPU thread sends the message and waits for the reply while other > BQL threads are locked out. > > If a condvar or similar mechanism is used then the vCPU sends the > message, drops the BQL, and waits on the condvar. The main loop thread > runs the receiver coroutine and re-enters the coroutine, which signals > the condvar. The vCPU then re-acquires the BQL. >
I understand this. The point I was trying to make was you'd need to acquire BQL twice for every reply: once by the main loop before it runs the receiver coroutine and once after the vCPU wakes up. That would seem to increase latency over the iothread model. >> Is your objection with using an iothread, or using a separate thread? >> I can change to using qemu_thread_create() and running aio_poll() from the >> thread routine, instead of creating an iothread. > > The vfio-user communication code shouldn't need to worry about threads > or locks. The code can be written in terms of AioContext so the caller > can use it from various environments without hardcoding details of the > BQL or threads into the communication code. This makes it easier to > understand and less tightly coupled. > > I'll try to sketch how that could work: > > The main concept is VFIOProxy, which has a QIOChannel (the socket > connection) and its main API is: > > void coroutine_fn vfio_user_co_send_recv(VFIOProxy *proxy, > VFIOUserHdr *msg, VFIOUserFDs *fds, int rsize, int flags); > > There is also a request callback for processing incoming requests: > > void coroutine_fn (*request)(void *opaque, char *buf, > VFIOUserFDs *fds); > > The main loop thread can either use vfio_user_co_send_recv() from > coroutine context or use this blocking wrapper: > > typedef struct { > VFIOProxy *proxy; > VFIOUserHdr *msg; > VFIOUserFDs *fds; > int rsize; > int flags; > bool done; > } VFIOUserSendRecvData; > > static void coroutine_fn vfu_send_recv_co(void *opaque) > { > VFIOUserSendRecvData *data = opaque; > vfio_user_co_send_recv(data->proxy, data->msg, data->fds, > data->rsize, data->flags); > data->done = true; > } > > /* A blocking version of vfio_user_co_send_recv() */ > void vfio_user_send_recv(VFIOProxy *proxy, VFIOUserHdr *msg, > VFIOUserFDs *fds, int rsize, int flags) > { > VFIOUserSendRecvData data = { > .proxy = proxy, > .msg = msg, > .fds = fds, > .rsize = rsize, > .flags = flags, > }; > Coroutine *co = qemu_coroutine_create(vfu_send_recv_co, &data); > qemu_coroutine_enter(co); > while (!data.done) { > aio_poll(proxy->ioc->ctx, true); > } > } > > The vCPU thread can use vfio_user_send_recv() if it wants, although the > BQL will be held, preventing other threads from making progress. That > can be avoided by writing a similar wrapper that uses a QemuSemaphore: > > typedef struct { > VFIOProxy *proxy; > VFIOUserHdr *msg; > VFIOUserFDs *fds; > int rsize; > int flags; > QemuSemaphore sem; > } VFIOUserSendRecvData; > > static void coroutine_fn vfu_send_recv_co(void *opaque) > { > VFIOUserSendRecvData *data = opaque; > vfio_user_co_send_recv(data->proxy, data->msg, data->fds, > data->rsize, data->flags); > qemu_sem_post(&data->sem); > } > > /* > * A blocking version of vfio_user_co_send_recv() that relies on > * another thread to run the event loop. This can be used from vCPU > * threads to avoid hogging the BQL. > */ > void vfio_user_vcpu_send_recv(VFIOProxy *proxy, VFIOUserHdr *msg, > VFIOUserFDs *fds, int rsize, int flags) > { > VFIOUserSendRecvData data = { > .proxy = proxy, > .msg = msg, > .fds = fds, > .rsize = rsize, > .flags = flags, > }; > Coroutine *co = qemu_coroutine_create(vfu_vcpu_send_recv_co, &data); > > qemu_sem_init(&data.sem, 0); > > qemu_coroutine_enter(co); > > qemu_mutex_unlock_iothread(); > qemu_sem_wait(&data.sem); > qemu_mutex_lock_iothread(); > > qemu_sem_destroy(&data.sem); > } > > With vfio_user_vcpu_send_recv() the vCPU thread doesn't call aio_poll() > itself but instead relies on the main loop thread to run the event loop. > I think this means I need 2 send algorithms: one for when called from the main loop, and another for when called outside the main loop (vCPU or migration). I can’t use the semaphore version from the main loop, since blocking the main loop would prevent the receiver routine from being scheduled, so I’d want to use aio_poll() there. Some vfio_user calls can come from either place (e.g., realize uses REGION_READ to read the device config space, and vCPU uses it on a guest load to the device), so I’d need to detect which thread I’m running in to choose the right sender. > By writing coroutines that run in proxy->ioc->ctx we keep the threading > model and locking in the caller. The communication code isn't aware of > or tied to specific threads. It's possible to drop proxy->lock because > state is only changed from within the AioContext, not multiple threads > that may run in parallel. I think this makes the communication code > simpler and cleaner. > > It's possible to use IOThreads with this approach: set the QIOChannel's > AioContext to the IOThread AioContext. However, I don't think we can do > this in the vhost-user server yet because QEMU's device models expect to > run with the BQL and not in an IOThread. > > I didn't go into detail about how vfio_user_co_send_recv() is > implemented. Please let me know if you want me to share ideas about > that, but it's what we've already discussed with a "receiver" coroutine > that re-enters the reply coroutines or calls ->request(). A CoMutex is > needed to around qio_channel_write_all() to ensure that coroutines > sending messages don't interleave partial writes if the socket sndbuf is > exhausted. > Here is where I questioned how coroutines work across threads. When the reply waiter is not the main loop, would the receiver coroutine re-enter the reply coroutine or signal the condvar it is waiting on? >> On a related subject: >> >> On Aug 24, 2021, at 8:14 AM, Stefan Hajnoczi <stefa...@redhat.com> wrote: >> >>>> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, >>>> + &local_err); >>> >>> This is a blocking call. My understanding is that the IOThread is shared >>> by all vfio-user devices, so other devices will have to wait if one of >>> them is acting up (e.g. the device emulation process sent less than >>> sizeof(msg) bytes). >> >> >> This shouldn’t block if the emulation process sends less than >> sizeof(msg) >> bytes. qio_channel_readv() will eventually call recvmsg(), which only >> blocks a >> short read if MSG_WAITALL is set, and it’s not set in this case. recvmsg() >> will >> return the data available, and vfio_user_recv() will treat a short read as >> an error. > > That's true but vfio_user_recv() can still block layer on: if only > sizeof(msg) bytes are available and msg.size > sizeof(msg) then the > second call blocks. > > msgleft = msg.size - sizeof(msg); > if (msgleft != 0) { > ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err); > > I think either code should be non-blocking or it shouldn't be. Writing > code that is partially non-blocking is asking for trouble because it's > not obvious where it can block and misbehaving or malicious programs can > cause it to block. > I wonder if I should just go fully non-blocking, and have the senders queue messages for the sending routine, and have the receiving routine either signal a reply waiter or schedule a request handling routine. JJ