On Thu, Feb 05, 2026 at 08:58:05PM +0100, Ruslan Ruslichenko wrote:
> From: Ruslan Ruslichenko <[email protected]>
> 
> Implement the mechanism to transfer packets from the dedicated
> protocol thread to the main QEMU execution loop for processing.
> 
> The patch adds the following features:
> - signaling logic using internal pipe to wake up the main loop
> - the rp_process handler, which retrieves packets from queue and
> dispatches them to the target Remote Port device.
> 
> This enables QEMU device models to handle remote events.
> 
> Signed-off-by: Edgar E. Iglesias <[email protected]>
> Signed-off-by: Takahiro Nakata <[email protected]>
> Signed-off-by: Ruslan Ruslichenko <[email protected]>
> ---
>  hw/core/remote-port.c         | 148 +++++++++++++++++++++++++++++++++-
>  include/hw/core/remote-port.h |   5 ++
>  2 files changed, 152 insertions(+), 1 deletion(-)
> 
> diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
> index 91b0682884..e44d9249c3 100644
> --- a/hw/core/remote-port.c
> +++ b/hw/core/remote-port.c
> @@ -52,6 +52,8 @@
>  #define REMOTE_PORT_CLASS(klass)    \
>       OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
>  
> +static void rp_event_read(void *opaque);
> +
>  static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
>  {
>      qemu_hexdump(stdout, prefix, buf, len);
> @@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t 
> count)
>      return r;
>  }
>  
> +static unsigned int rp_has_work(RemotePort *s)
> +{
> +    unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
> +    return work;
> +}
> +
>  static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
>  {
>      s->peer.version = pkt->hello.version;
> @@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, 
> char *name)
>      return chr;
>  }
>  
> +void rp_process(RemotePort *s)
> +{
> +    while (true) {
> +        struct rp_pkt *pkt;
> +        unsigned int rpos;
> +        bool actioned = false;
> +        RemotePortDevice *dev;
> +        RemotePortDeviceClass *rpdc;
> +
> +        qemu_mutex_lock(&s->rsp_mutex);
> +        if (!rp_has_work(s)) {
> +            qemu_mutex_unlock(&s->rsp_mutex);
> +            break;
> +        }
> +        rpos = s->rx_queue.rpos;
> +
> +        pkt = s->rx_queue.pkt[rpos].pkt;
> +        D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
> +                 s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
> +                 pkt->hdr.cmd, pkt->hdr.dev));

Same point as last patch, that emitted structured data in
tracepoints is preferrable to merely logging. Consider this
to apply to any other qemu_log call, so I won't repeat it
every time.

> +
> +        /*
> +         * To handle recursiveness, we need to advance the index
> +         * index before processing the packet.
> +         */
> +        s->rx_queue.rpos++;
> +        s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
> +        qemu_mutex_unlock(&s->rsp_mutex);
> +
> +        dev = s->devs[pkt->hdr.dev];
> +        if (dev) {
> +            rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
> +            if (rpdc->ops[pkt->hdr.cmd]) {
> +                rpdc->ops[pkt->hdr.cmd](dev, pkt);
> +                actioned = true;
> +            }
> +        }
> +
> +        switch (pkt->hdr.cmd) {
> +        /* TBD */
> +        default:
> +            assert(actioned);
> +        }
> +
> +        s->rx_queue.inuse[rpos] = false;
> +        qemu_sem_post(&s->rx_queue.sem);
> +    }
> +}
> +
> +static void rp_event_read(void *opaque)
> +{
> +    RemotePort *s = REMOTE_PORT(opaque);
> +    unsigned char buf[32];
> +    ssize_t r;
> +
> +    /* We don't care about the data. Just read it out to clear the event.  */
> +    do {
> +#ifdef _WIN32
> +        r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
> +#else
> +        r = read(s->event.pipe.read, buf, sizeof buf);
> +#endif
> +        if (r == 0) {
> +            return;
> +        }
> +    } while (r == sizeof buf || (r < 0 && errno == EINTR));
> +
> +    rp_process(s);
> +}
> +
> +static void rp_event_notify(RemotePort *s)
> +{
> +    unsigned char d = 0;
> +    ssize_t r;
> +
> +#ifdef _WIN32
> +    /* Mingw is sensitive about doing write's to socket descriptors.  */
> +    r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
> +#else
> +    r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
> +#endif
> +    if (r == 0) {
> +        hw_error("%s: pipe closed\n", s->prefix);
> +    }
> +}
> +
> +/* Handover a pkt to CPU or IO-thread context.  */
> +static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
> +{
> +    bool full;
> +
> +    /*
> +     * Take the rsp lock around the wpos update, otherwise
> +     * rp_wait_resp will race with us.
> +     */
> +    qemu_mutex_lock(&s->rsp_mutex);
> +    s->rx_queue.wpos++;
> +    s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
> +    /*
> +     * Ensure rx_queue index update is visible to consumer
> +     * before signaling event, to prevent lost wakeup
> +     */
> +    smp_mb();
> +    rp_event_notify(s);
> +    qemu_cond_signal(&s->progress_cond);
> +    qemu_mutex_unlock(&s->rsp_mutex);
> +
> +    do {
> +        full = s->rx_queue.inuse[s->rx_queue.wpos];
> +        if (full) {
> +            qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
> +        if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
> +#ifndef _WIN32
> +                int sval;
> +
> +#ifndef CONFIG_SEM_TIMEDWAIT
> +                sval = s->rx_queue.sem.count;
> +#else
> +                sem_getvalue(&s->rx_queue.sem.sem, &sval);
> +#endif
> +                qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
> +                         s->rx_queue.rpos, s->rx_queue.wpos);
> +#endif
> +                qemu_log("Deadlock?\n");
> +        }
> +        }
> +    } while (full);
> +}

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|


Reply via email to