From: Ruslan Ruslichenko <[email protected]>

Implement the mechanism to transfer packets from the dedicated
protocol thread to the main QEMU execution loop for processing.

The patch adds the following features:
- signaling logic using internal pipe to wake up the main loop
- the rp_process handler, which retrieves packets from queue and
dispatches them to the target Remote Port device.

This enables QEMU device models to handle remote events.

Signed-off-by: Edgar E. Iglesias <[email protected]>
Signed-off-by: Takahiro Nakata <[email protected]>
Signed-off-by: Ruslan Ruslichenko <[email protected]>
---
 hw/core/remote-port.c         | 148 +++++++++++++++++++++++++++++++++-
 include/hw/core/remote-port.h |   5 ++
 2 files changed, 152 insertions(+), 1 deletion(-)

diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
index 91b0682884..e44d9249c3 100644
--- a/hw/core/remote-port.c
+++ b/hw/core/remote-port.c
@@ -52,6 +52,8 @@
 #define REMOTE_PORT_CLASS(klass)    \
      OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
 
+static void rp_event_read(void *opaque);
+
 static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
 {
     qemu_hexdump(stdout, prefix, buf, len);
@@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t 
count)
     return r;
 }
 
+static unsigned int rp_has_work(RemotePort *s)
+{
+    unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
+    return work;
+}
+
 static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
 {
     s->peer.version = pkt->hello.version;
@@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char 
*name)
     return chr;
 }
 
+void rp_process(RemotePort *s)
+{
+    while (true) {
+        struct rp_pkt *pkt;
+        unsigned int rpos;
+        bool actioned = false;
+        RemotePortDevice *dev;
+        RemotePortDeviceClass *rpdc;
+
+        qemu_mutex_lock(&s->rsp_mutex);
+        if (!rp_has_work(s)) {
+            qemu_mutex_unlock(&s->rsp_mutex);
+            break;
+        }
+        rpos = s->rx_queue.rpos;
+
+        pkt = s->rx_queue.pkt[rpos].pkt;
+        D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
+                 s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
+                 pkt->hdr.cmd, pkt->hdr.dev));
+
+        /*
+         * To handle recursiveness, we need to advance the index
+         * index before processing the packet.
+         */
+        s->rx_queue.rpos++;
+        s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
+        qemu_mutex_unlock(&s->rsp_mutex);
+
+        dev = s->devs[pkt->hdr.dev];
+        if (dev) {
+            rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
+            if (rpdc->ops[pkt->hdr.cmd]) {
+                rpdc->ops[pkt->hdr.cmd](dev, pkt);
+                actioned = true;
+            }
+        }
+
+        switch (pkt->hdr.cmd) {
+        /* TBD */
+        default:
+            assert(actioned);
+        }
+
+        s->rx_queue.inuse[rpos] = false;
+        qemu_sem_post(&s->rx_queue.sem);
+    }
+}
+
+static void rp_event_read(void *opaque)
+{
+    RemotePort *s = REMOTE_PORT(opaque);
+    unsigned char buf[32];
+    ssize_t r;
+
+    /* We don't care about the data. Just read it out to clear the event.  */
+    do {
+#ifdef _WIN32
+        r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
+#else
+        r = read(s->event.pipe.read, buf, sizeof buf);
+#endif
+        if (r == 0) {
+            return;
+        }
+    } while (r == sizeof buf || (r < 0 && errno == EINTR));
+
+    rp_process(s);
+}
+
+static void rp_event_notify(RemotePort *s)
+{
+    unsigned char d = 0;
+    ssize_t r;
+
+#ifdef _WIN32
+    /* Mingw is sensitive about doing write's to socket descriptors.  */
+    r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
+#else
+    r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
+#endif
+    if (r == 0) {
+        hw_error("%s: pipe closed\n", s->prefix);
+    }
+}
+
+/* Handover a pkt to CPU or IO-thread context.  */
+static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
+{
+    bool full;
+
+    /*
+     * Take the rsp lock around the wpos update, otherwise
+     * rp_wait_resp will race with us.
+     */
+    qemu_mutex_lock(&s->rsp_mutex);
+    s->rx_queue.wpos++;
+    s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
+    /*
+     * Ensure rx_queue index update is visible to consumer
+     * before signaling event, to prevent lost wakeup
+     */
+    smp_mb();
+    rp_event_notify(s);
+    qemu_cond_signal(&s->progress_cond);
+    qemu_mutex_unlock(&s->rsp_mutex);
+
+    do {
+        full = s->rx_queue.inuse[s->rx_queue.wpos];
+        if (full) {
+            qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
+        if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
+#ifndef _WIN32
+                int sval;
+
+#ifndef CONFIG_SEM_TIMEDWAIT
+                sval = s->rx_queue.sem.count;
+#else
+                sem_getvalue(&s->rx_queue.sem.sem, &sval);
+#endif
+                qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
+                         s->rx_queue.rpos, s->rx_queue.wpos);
+#endif
+                qemu_log("Deadlock?\n");
+        }
+        }
+    } while (full);
+}
+
 static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
 {
     struct rp_pkt *pkt = dpkt->pkt;
@@ -208,7 +345,7 @@ static bool rp_pt_process_pkt(RemotePort *s, 
RemotePortDynPkt *dpkt)
     case RP_CMD_interrupt:
     case RP_CMD_ats_req:
     case RP_CMD_ats_inv:
-        /* TBD */;
+        rp_pt_handover_pkt(s, dpkt);
         break;
     default:
         g_assert_not_reached();
@@ -312,6 +449,8 @@ static void rp_realize(DeviceState *dev, Error **errp)
     s->prefix = object_get_canonical_path(OBJECT(dev));
 
     qemu_mutex_init(&s->write_mutex);
+    qemu_mutex_init(&s->rsp_mutex);
+    qemu_cond_init(&s->progress_cond);
 
     if (!qemu_chr_fe_get_driver(&s->chr)) {
         char *name;
@@ -413,6 +552,7 @@ static void rp_realize(DeviceState *dev, Error **errp)
                         s->prefix);
             exit(EXIT_FAILURE);
         }
+        qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
     }
 #else
     if (!g_unix_open_pipe(s->event.pipes, FD_CLOEXEC, NULL)) {
@@ -427,7 +567,10 @@ static void rp_realize(DeviceState *dev, Error **errp)
         exit(EXIT_FAILURE);
     }
 
+    qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
 #endif
+
+    qemu_sem_init(&s->rx_queue.sem, ARRAY_SIZE(s->rx_queue.pkt) - 1);
 }
 
 static void rp_unrealize(DeviceState *dev)
@@ -436,6 +579,9 @@ static void rp_unrealize(DeviceState *dev)
 
     s->finalizing = true;
 
+    /* Unregister handler.  */
+    qemu_set_fd_handler(s->event.pipe.read, NULL, NULL, s);
+
     info_report("%s: Wait for remote-port to disconnect", s->prefix);
     qemu_chr_fe_disconnect(&s->chr);
     qemu_thread_join(&s->thread);
diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h
index b88e523894..21dfbe89cd 100644
--- a/include/hw/core/remote-port.h
+++ b/include/hw/core/remote-port.h
@@ -74,6 +74,9 @@ struct RemotePort {
     char *chrdev_id;
     struct rp_peer_state peer;
 
+    QemuMutex rsp_mutex;
+    QemuCond progress_cond;
+
 #define RX_QUEUE_SIZE 1024
     struct {
         /* This array must be sized minimum 2 and always a power of 2.  */
@@ -100,6 +103,8 @@ struct RemotePort {
     RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS];
 };
 
+void rp_process(RemotePort *s);
+
 ssize_t rp_write(RemotePort *s, const void *buf, size_t count);
 
 #endif
-- 
2.43.0


Reply via email to