commit: 47d41c9be232cd5d534ae3f7ed5b81ceda739a02
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Sun Dec 7 18:27:31 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Sun Dec 7 18:27:31 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=47d41c9b
Use a single FIFO queue for all waiters
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
steve.cxx | 49 ++++++++++++++++++++++++++-----------------------
1 file changed, 26 insertions(+), 23 deletions(-)
diff --git a/steve.cxx b/steve.cxx
index 9280d88..c04e2f2 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -89,8 +89,7 @@ struct steve_state {
double max_load_avg{-1}; /* < 0 implies no load average */
double load_avg;
int64_t tokens;
- std::deque<steve_waiter> read_waiters;
- std::deque<steve_waiter> poll_waiters;
+ std::deque<steve_waiter> waiters;
std::unordered_map<uint64_t, steve_process> processes;
struct event_base *evb;
@@ -143,20 +142,23 @@ static void steve_give_token(steve_state *state,
fuse_req_t req, uint64_t pid)
static void steve_wake_waiters(steve_state *state)
{
- while (!state->read_waiters.empty() && steve_can_give_token(state) ==
steve_token_availability::available) {
- const steve_waiter *read_waiter = &state->read_waiters.front();
- steve_give_token(state,
std::get<fuse_req_t>(read_waiter->handle), read_waiter->pid);
- state->read_waiters.pop_front();
- }
+ for (auto it = state->waiters.begin(); it != state->waiters.end();) {
+ if (steve_can_give_token(state) !=
steve_token_availability::available)
+ break;
- if (steve_can_give_token(state) == steve_token_availability::available)
{
- for (auto &poll_waiter : state->poll_waiters) {
+ if (fuse_req_t *read_req =
std::get_if<fuse_req_t>(&it->handle)) {
+ /* read request */
+ steve_give_token(state, *read_req, it->pid);
+ } else if (fuse_pollhandle **poll_handle =
std::get_if<fuse_pollhandle *>(&it->handle)) {
+ /* poll request */
if (state->verbose)
std::print(stderr, "Notifying PID {} about
POLLIN, {} tokens left, {} tokens held by process\n",
- poll_waiter.pid, state->tokens,
state->processes[poll_waiter.pid].tokens_held);
- fuse_lowlevel_notify_poll(std::get<fuse_pollhandle
*>(poll_waiter.handle));
- }
- state->poll_waiters.clear();
+ it->pid, state->tokens,
state->processes[it->pid].tokens_held);
+ fuse_lowlevel_notify_poll(*poll_handle);
+ } else
+ assert(0 && "invalid waiter");
+
+ it = state->waiters.erase(it);
}
}
@@ -197,8 +199,7 @@ static void steve_destroy(void *userdata)
{
steve_state *state = static_cast<steve_state *>(userdata);
- state->read_waiters.clear();
- state->poll_waiters.clear();
+ state->waiters.clear();
state->processes.clear();
}
@@ -284,12 +285,14 @@ static void steve_interrupt(fuse_req_t req, void
*userdata)
steve_state *state = static_cast<steve_state *>(userdata);
fuse_reply_err(req, EINTR);
- for (auto it = state->read_waiters.begin(); it !=
state->read_waiters.end(); ++it) {
- if (std::get<fuse_req_t>(it->handle) == req) {
- if (state->verbose)
- std::print(stderr, "Passed EINTR to PID {}\n",
it->pid);
- state->read_waiters.erase(it);
- break;
+ for (auto it = state->waiters.begin(); it != state->waiters.end();
++it) {
+ if (fuse_req_t *read_req =
std::get_if<fuse_req_t>(&it->handle)) {
+ if (*read_req == req) {
+ if (state->verbose)
+ std::print(stderr, "Passed EINTR to PID
{}\n", it->pid);
+ state->waiters.erase(it);
+ break;
+ }
}
}
}
@@ -320,7 +323,7 @@ static void steve_read(
return;
}
- state->read_waiters.emplace_back(steve_waiter{req, fi->fh});
+ state->waiters.emplace_back(steve_waiter{req, fi->fh});
if (state->verbose) {
if (token_avail == steve_token_availability::load_exceeded) {
std::print(stderr, "Load exceeded while PID {}
requested token, waiting, {} tokens free, "
@@ -388,7 +391,7 @@ static void steve_poll(
/* POLLOUT is always possible, POLLIN only if we have any tokens */
steve_token_availability token_avail = steve_can_give_token(state);
if (token_avail != steve_token_availability::available) {
- state->poll_waiters.emplace_back(ph, fi->fh);
+ state->waiters.emplace_back(ph, fi->fh);
events &= ~POLLIN;
}