On Thu, Feb 05, 2026 at 08:58:05PM +0100, Ruslan Ruslichenko wrote:
> From: Ruslan Ruslichenko <[email protected]>
>
> Implement the mechanism to transfer packets from the dedicated
> protocol thread to the main QEMU execution loop for processing.
>
> The patch adds the following features:
> - signaling logic using internal pipe to wake up the main loop
> - the rp_process handler, which retrieves packets from queue and
> dispatches them to the target Remote Port device.
>
> This enables QEMU device models to handle remote events.
>
> Signed-off-by: Edgar E. Iglesias <[email protected]>
> Signed-off-by: Takahiro Nakata <[email protected]>
> Signed-off-by: Ruslan Ruslichenko <[email protected]>
> ---
> hw/core/remote-port.c | 148 +++++++++++++++++++++++++++++++++-
> include/hw/core/remote-port.h | 5 ++
> 2 files changed, 152 insertions(+), 1 deletion(-)
>
> diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
> index 91b0682884..e44d9249c3 100644
> --- a/hw/core/remote-port.c
> +++ b/hw/core/remote-port.c
> @@ -52,6 +52,8 @@
> #define REMOTE_PORT_CLASS(klass) \
> OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
>
> +static void rp_event_read(void *opaque);
> +
> static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
> {
> qemu_hexdump(stdout, prefix, buf, len);
> @@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t
> count)
> return r;
> }
>
> +static unsigned int rp_has_work(RemotePort *s)
> +{
> + unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
> + return work;
> +}
> +
> static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
> {
> s->peer.version = pkt->hello.version;
> @@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s,
> char *name)
> return chr;
> }
>
> +void rp_process(RemotePort *s)
> +{
> + while (true) {
> + struct rp_pkt *pkt;
> + unsigned int rpos;
> + bool actioned = false;
> + RemotePortDevice *dev;
> + RemotePortDeviceClass *rpdc;
> +
> + qemu_mutex_lock(&s->rsp_mutex);
> + if (!rp_has_work(s)) {
> + qemu_mutex_unlock(&s->rsp_mutex);
> + break;
> + }
> + rpos = s->rx_queue.rpos;
> +
> + pkt = s->rx_queue.pkt[rpos].pkt;
> + D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
> + s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
> + pkt->hdr.cmd, pkt->hdr.dev));
Same point as last patch, that emitted structured data in
tracepoints is preferrable to merely logging. Consider this
to apply to any other qemu_log call, so I won't repeat it
every time.
> +
> + /*
> + * To handle recursiveness, we need to advance the index
> + * index before processing the packet.
> + */
> + s->rx_queue.rpos++;
> + s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
> + qemu_mutex_unlock(&s->rsp_mutex);
> +
> + dev = s->devs[pkt->hdr.dev];
> + if (dev) {
> + rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
> + if (rpdc->ops[pkt->hdr.cmd]) {
> + rpdc->ops[pkt->hdr.cmd](dev, pkt);
> + actioned = true;
> + }
> + }
> +
> + switch (pkt->hdr.cmd) {
> + /* TBD */
> + default:
> + assert(actioned);
> + }
> +
> + s->rx_queue.inuse[rpos] = false;
> + qemu_sem_post(&s->rx_queue.sem);
> + }
> +}
> +
> +static void rp_event_read(void *opaque)
> +{
> + RemotePort *s = REMOTE_PORT(opaque);
> + unsigned char buf[32];
> + ssize_t r;
> +
> + /* We don't care about the data. Just read it out to clear the event. */
> + do {
> +#ifdef _WIN32
> + r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
> +#else
> + r = read(s->event.pipe.read, buf, sizeof buf);
> +#endif
> + if (r == 0) {
> + return;
> + }
> + } while (r == sizeof buf || (r < 0 && errno == EINTR));
> +
> + rp_process(s);
> +}
> +
> +static void rp_event_notify(RemotePort *s)
> +{
> + unsigned char d = 0;
> + ssize_t r;
> +
> +#ifdef _WIN32
> + /* Mingw is sensitive about doing write's to socket descriptors. */
> + r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
> +#else
> + r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
> +#endif
> + if (r == 0) {
> + hw_error("%s: pipe closed\n", s->prefix);
> + }
> +}
> +
> +/* Handover a pkt to CPU or IO-thread context. */
> +static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
> +{
> + bool full;
> +
> + /*
> + * Take the rsp lock around the wpos update, otherwise
> + * rp_wait_resp will race with us.
> + */
> + qemu_mutex_lock(&s->rsp_mutex);
> + s->rx_queue.wpos++;
> + s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
> + /*
> + * Ensure rx_queue index update is visible to consumer
> + * before signaling event, to prevent lost wakeup
> + */
> + smp_mb();
> + rp_event_notify(s);
> + qemu_cond_signal(&s->progress_cond);
> + qemu_mutex_unlock(&s->rsp_mutex);
> +
> + do {
> + full = s->rx_queue.inuse[s->rx_queue.wpos];
> + if (full) {
> + qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
> + if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
> +#ifndef _WIN32
> + int sval;
> +
> +#ifndef CONFIG_SEM_TIMEDWAIT
> + sval = s->rx_queue.sem.count;
> +#else
> + sem_getvalue(&s->rx_queue.sem.sem, &sval);
> +#endif
> + qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
> + s->rx_queue.rpos, s->rx_queue.wpos);
> +#endif
> + qemu_log("Deadlock?\n");
> + }
> + }
> + } while (full);
> +}
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|