From: Ruslan Ruslichenko <[email protected]> Implement the mechanism to transfer packets from the dedicated protocol thread to the main QEMU execution loop for processing.
The patch adds the following features: - signaling logic using internal pipe to wake up the main loop - the rp_process handler, which retrieves packets from queue and dispatches them to the target Remote Port device. This enables QEMU device models to handle remote events. Signed-off-by: Edgar E. Iglesias <[email protected]> Signed-off-by: Takahiro Nakata <[email protected]> Signed-off-by: Ruslan Ruslichenko <[email protected]> --- hw/core/remote-port.c | 148 +++++++++++++++++++++++++++++++++- include/hw/core/remote-port.h | 5 ++ 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c index 91b0682884..e44d9249c3 100644 --- a/hw/core/remote-port.c +++ b/hw/core/remote-port.c @@ -52,6 +52,8 @@ #define REMOTE_PORT_CLASS(klass) \ OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT) +static void rp_event_read(void *opaque); + static void rp_pkt_dump(const char *prefix, const char *buf, size_t len) { qemu_hexdump(stdout, prefix, buf, len); @@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t count) return r; } +static unsigned int rp_has_work(RemotePort *s) +{ + unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos; + return work; +} + static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt) { s->peer.version = pkt->hello.version; @@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name) return chr; } +void rp_process(RemotePort *s) +{ + while (true) { + struct rp_pkt *pkt; + unsigned int rpos; + bool actioned = false; + RemotePortDevice *dev; + RemotePortDeviceClass *rpdc; + + qemu_mutex_lock(&s->rsp_mutex); + if (!rp_has_work(s)) { + qemu_mutex_unlock(&s->rsp_mutex); + break; + } + rpos = s->rx_queue.rpos; + + pkt = s->rx_queue.pkt[rpos].pkt; + D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n", + s->prefix, s->rx_queue.rpos, s->rx_queue.wpos, + pkt->hdr.cmd, pkt->hdr.dev)); + + /* + * To handle recursiveness, we need to advance the index + * index before processing the packet. + */ + s->rx_queue.rpos++; + s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt); + qemu_mutex_unlock(&s->rsp_mutex); + + dev = s->devs[pkt->hdr.dev]; + if (dev) { + rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev); + if (rpdc->ops[pkt->hdr.cmd]) { + rpdc->ops[pkt->hdr.cmd](dev, pkt); + actioned = true; + } + } + + switch (pkt->hdr.cmd) { + /* TBD */ + default: + assert(actioned); + } + + s->rx_queue.inuse[rpos] = false; + qemu_sem_post(&s->rx_queue.sem); + } +} + +static void rp_event_read(void *opaque) +{ + RemotePort *s = REMOTE_PORT(opaque); + unsigned char buf[32]; + ssize_t r; + + /* We don't care about the data. Just read it out to clear the event. */ + do { +#ifdef _WIN32 + r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0); +#else + r = read(s->event.pipe.read, buf, sizeof buf); +#endif + if (r == 0) { + return; + } + } while (r == sizeof buf || (r < 0 && errno == EINTR)); + + rp_process(s); +} + +static void rp_event_notify(RemotePort *s) +{ + unsigned char d = 0; + ssize_t r; + +#ifdef _WIN32 + /* Mingw is sensitive about doing write's to socket descriptors. */ + r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0); +#else + r = qemu_write_full(s->event.pipe.write, &d, sizeof d); +#endif + if (r == 0) { + hw_error("%s: pipe closed\n", s->prefix); + } +} + +/* Handover a pkt to CPU or IO-thread context. */ +static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt) +{ + bool full; + + /* + * Take the rsp lock around the wpos update, otherwise + * rp_wait_resp will race with us. + */ + qemu_mutex_lock(&s->rsp_mutex); + s->rx_queue.wpos++; + s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt); + /* + * Ensure rx_queue index update is visible to consumer + * before signaling event, to prevent lost wakeup + */ + smp_mb(); + rp_event_notify(s); + qemu_cond_signal(&s->progress_cond); + qemu_mutex_unlock(&s->rsp_mutex); + + do { + full = s->rx_queue.inuse[s->rx_queue.wpos]; + if (full) { + qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos); + if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) { +#ifndef _WIN32 + int sval; + +#ifndef CONFIG_SEM_TIMEDWAIT + sval = s->rx_queue.sem.count; +#else + sem_getvalue(&s->rx_queue.sem.sem, &sval); +#endif + qemu_log("semwait: %d rpos=%u wpos=%u\n", sval, + s->rx_queue.rpos, s->rx_queue.wpos); +#endif + qemu_log("Deadlock?\n"); + } + } + } while (full); +} + static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt) { struct rp_pkt *pkt = dpkt->pkt; @@ -208,7 +345,7 @@ static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt) case RP_CMD_interrupt: case RP_CMD_ats_req: case RP_CMD_ats_inv: - /* TBD */; + rp_pt_handover_pkt(s, dpkt); break; default: g_assert_not_reached(); @@ -312,6 +449,8 @@ static void rp_realize(DeviceState *dev, Error **errp) s->prefix = object_get_canonical_path(OBJECT(dev)); qemu_mutex_init(&s->write_mutex); + qemu_mutex_init(&s->rsp_mutex); + qemu_cond_init(&s->progress_cond); if (!qemu_chr_fe_get_driver(&s->chr)) { char *name; @@ -413,6 +552,7 @@ static void rp_realize(DeviceState *dev, Error **errp) s->prefix); exit(EXIT_FAILURE); } + qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s); } #else if (!g_unix_open_pipe(s->event.pipes, FD_CLOEXEC, NULL)) { @@ -427,7 +567,10 @@ static void rp_realize(DeviceState *dev, Error **errp) exit(EXIT_FAILURE); } + qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s); #endif + + qemu_sem_init(&s->rx_queue.sem, ARRAY_SIZE(s->rx_queue.pkt) - 1); } static void rp_unrealize(DeviceState *dev) @@ -436,6 +579,9 @@ static void rp_unrealize(DeviceState *dev) s->finalizing = true; + /* Unregister handler. */ + qemu_set_fd_handler(s->event.pipe.read, NULL, NULL, s); + info_report("%s: Wait for remote-port to disconnect", s->prefix); qemu_chr_fe_disconnect(&s->chr); qemu_thread_join(&s->thread); diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h index b88e523894..21dfbe89cd 100644 --- a/include/hw/core/remote-port.h +++ b/include/hw/core/remote-port.h @@ -74,6 +74,9 @@ struct RemotePort { char *chrdev_id; struct rp_peer_state peer; + QemuMutex rsp_mutex; + QemuCond progress_cond; + #define RX_QUEUE_SIZE 1024 struct { /* This array must be sized minimum 2 and always a power of 2. */ @@ -100,6 +103,8 @@ struct RemotePort { RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS]; }; +void rp_process(RemotePort *s); + ssize_t rp_write(RemotePort *s, const void *buf, size_t count); #endif -- 2.43.0
