From: Ruslan Ruslichenko <[email protected]> Add infrastructure to handle transaction responses from remote peers.
This patch enables QEMU to wait for replies when it initiates a memory access (Read/Write). It implements a mechanism to track outstanding transaction IDs and block the execution thread until the patching response is received. Signed-off-by: Edgar E. Iglesias <[email protected]> Signed-off-by: Takahiro Nakata <[email protected]> Signed-off-by: Ruslan Ruslichenko <[email protected]> --- hw/core/remote-port.c | 136 +++++++++++++++++++++++++++++++++- include/hw/core/remote-port.h | 29 ++++++++ 2 files changed, 163 insertions(+), 2 deletions(-) diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c index e44d9249c3..00c9529348 100644 --- a/hw/core/remote-port.c +++ b/hw/core/remote-port.c @@ -59,6 +59,16 @@ static void rp_pkt_dump(const char *prefix, const char *buf, size_t len) qemu_hexdump(stdout, prefix, buf, len); } +void rp_rsp_mutex_lock(RemotePort *s) +{ + qemu_mutex_lock(&s->rsp_mutex); +} + +void rp_rsp_mutex_unlock(RemotePort *s) +{ + qemu_mutex_unlock(&s->rsp_mutex); +} + static void rp_fatal_error(RemotePort *s, const char *reason) { error_report("%s: %s", s->prefix, reason); @@ -104,6 +114,80 @@ static unsigned int rp_has_work(RemotePort *s) return work; } +/* Response handling. */ +RemotePortRespSlot *rp_dev_timed_wait_resp(RemotePort *s, uint32_t dev, + uint32_t id, int timems) +{ + int i; + + assert(s->devs[dev]); + + /* Find a free slot. */ + for (i = 0; i < ARRAY_SIZE(s->dev_state[dev].rsp_queue); i++) { + if (s->dev_state[dev].rsp_queue[i].used == false) { + break; + } + } + + if (i == ARRAY_SIZE(s->dev_state[dev].rsp_queue) || + s->dev_state[dev].rsp_queue[i].used == true) { + error_report("Number of outstanding transactions exceeded! %d", + RP_MAX_OUTSTANDING_TRANSACTIONS); + rp_fatal_error(s, "Internal error"); + } + + /* Got a slot, fill it in. */ + s->dev_state[dev].rsp_queue[i].id = id; + s->dev_state[dev].rsp_queue[i].valid = false; + s->dev_state[dev].rsp_queue[i].used = true; + + while (!s->dev_state[dev].rsp_queue[i].valid) { + rp_rsp_mutex_unlock(s); + rp_event_read(s); + rp_rsp_mutex_lock(s); + if (s->dev_state[dev].rsp_queue[i].valid) { + break; + } + if (!rp_has_work(s)) { + if (timems) { + if (!qemu_cond_timedwait(&s->progress_cond, &s->rsp_mutex, + timems)) { + /* + * TimeOut! + */ + break; + } + } else { + qemu_cond_wait(&s->progress_cond, &s->rsp_mutex); + } + } + } + return &s->dev_state[dev].rsp_queue[i]; +} + +RemotePortRespSlot *rp_dev_wait_resp(RemotePort *s, uint32_t dev, uint32_t id) +{ + return rp_dev_timed_wait_resp(s, dev, id, 0); +} + +RemotePortDynPkt rp_wait_resp(RemotePort *s) +{ + while (!rp_dpkt_is_valid(&s->rspqueue)) { + rp_rsp_mutex_unlock(s); + rp_event_read(s); + rp_rsp_mutex_lock(s); + /* Need to recheck the condition with the rsp lock taken. */ + if (rp_dpkt_is_valid(&s->rspqueue)) { + break; + } + D(qemu_log("%s: wait for progress\n", __func__)); + if (!rp_has_work(s)) { + qemu_cond_wait(&s->progress_cond, &s->rsp_mutex); + } + } + return s->rspqueue; +} + static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt) { s->peer.version = pkt->hello.version; @@ -328,14 +412,52 @@ static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt) { struct rp_pkt *pkt = dpkt->pkt; - D(qemu_log("%s: cmd=%x id=%d dev=%d\n", __func__, pkt->hdr.cmd, - pkt->hdr.id, pkt->hdr.dev)); + D(qemu_log("%s: cmd=%x id=%d dev=%d rsp=%d\n", __func__, pkt->hdr.cmd, + pkt->hdr.id, pkt->hdr.dev, + pkt->hdr.flags & RP_PKT_FLAGS_response)); if (pkt->hdr.dev >= ARRAY_SIZE(s->devs)) { /* FIXME: Respond with an error. */ return true; } + if (pkt->hdr.flags & RP_PKT_FLAGS_response) { + uint32_t dev = pkt->hdr.dev; + uint32_t id = pkt->hdr.id; + int i; + + if (pkt->hdr.flags & RP_PKT_FLAGS_posted) { + printf("Drop response for posted packets\n"); + return true; + } + + qemu_mutex_lock(&s->rsp_mutex); + + /* Try to find a per-device slot first. */ + for (i = 0; i < ARRAY_SIZE(s->dev_state[dev].rsp_queue); i++) { + if (s->devs[dev] && s->dev_state[dev].rsp_queue[i].used == true + && s->dev_state[dev].rsp_queue[i].id == id) { + break; + } + } + + if (i < ARRAY_SIZE(s->dev_state[dev].rsp_queue)) { + /* Found a per device one. */ + assert(s->dev_state[dev].rsp_queue[i].valid == false); + + rp_dpkt_swap(&s->dev_state[dev].rsp_queue[i].rsp, dpkt); + s->dev_state[dev].rsp_queue[i].valid = true; + + qemu_cond_signal(&s->progress_cond); + } else { + rp_dpkt_swap(&s->rspqueue, dpkt); + qemu_cond_signal(&s->progress_cond); + } + + qemu_mutex_unlock(&s->rsp_mutex); + return true; + } + switch (pkt->hdr.cmd) { case RP_CMD_hello: rp_cmd_hello(s, pkt); @@ -389,6 +511,7 @@ static void *rp_protocol_thread(void *arg) /* Make sure we have a decent bufsize to start with. */ rp_dpkt_alloc(&s->rsp, sizeof s->rsp.pkt->busaccess + 1024); + rp_dpkt_alloc(&s->rspqueue, sizeof s->rspqueue.pkt->busaccess + 1024); for (i = 0; i < ARRAY_SIZE(s->rx_queue.pkt); i++) { rp_dpkt_alloc(&s->rx_queue.pkt[i], sizeof s->rx_queue.pkt[i].pkt->busaccess + 1024); @@ -614,6 +737,7 @@ static void rp_prop_allow_set_link(const Object *obj, const char *name, static void rp_init(Object *obj) { RemotePort *s = REMOTE_PORT(obj); + int t; int i; for (i = 0; i < REMOTE_PORT_MAX_DEVS; ++i) { @@ -623,6 +747,14 @@ static void rp_init(Object *obj) rp_prop_allow_set_link, OBJ_PROP_LINK_STRONG); g_free(name); + + + for (t = 0; t < RP_MAX_OUTSTANDING_TRANSACTIONS; t++) { + s->dev_state[i].rsp_queue[t].used = false; + s->dev_state[i].rsp_queue[t].valid = false; + rp_dpkt_alloc(&s->dev_state[i].rsp_queue[t].rsp, + sizeof s->dev_state[i].rsp_queue[t].rsp.pkt->busaccess + 1024); + } } } diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h index 21dfbe89cd..c1b21eb573 100644 --- a/include/hw/core/remote-port.h +++ b/include/hw/core/remote-port.h @@ -53,6 +53,13 @@ typedef struct RemotePortDeviceClass { #define TYPE_REMOTE_PORT "remote-port" #define REMOTE_PORT(obj) OBJECT_CHECK(RemotePort, (obj), TYPE_REMOTE_PORT) +typedef struct RemotePortRespSlot { + RemotePortDynPkt rsp; + uint32_t id; + bool used; + bool valid; +} RemotePortRespSlot; + struct RemotePort { DeviceState parent; @@ -93,6 +100,13 @@ struct RemotePort { */ RemotePortDynPkt rsp; + /* + * rspqueue holds received responses from the remote side. + * Only one for the moment but it might grow. + * Used by the master. + */ + RemotePortDynPkt rspqueue; + const char *prefix; const char *remote_prefix; @@ -100,9 +114,24 @@ struct RemotePort { bool reset_done; #define REMOTE_PORT_MAX_DEVS 1024 +#define RP_MAX_OUTSTANDING_TRANSACTIONS 32 + struct { + RemotePortRespSlot rsp_queue[RP_MAX_OUTSTANDING_TRANSACTIONS]; + } dev_state[REMOTE_PORT_MAX_DEVS]; + RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS]; }; + +void rp_rsp_mutex_lock(RemotePort *s); +void rp_rsp_mutex_unlock(RemotePort *s); + +RemotePortDynPkt rp_wait_resp(RemotePort *s); + +RemotePortRespSlot *rp_dev_wait_resp(RemotePort *s, uint32_t dev, uint32_t id); +RemotePortRespSlot *rp_dev_timed_wait_resp(RemotePort *s, uint32_t dev, + uint32_t id, int timems); + void rp_process(RemotePort *s); ssize_t rp_write(RemotePort *s, const void *buf, size_t count); -- 2.43.0
