commit:     b8d87a922359b2243276c9377f3971a0aa5ff768
Author:     Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Sun Dec  7 18:41:51 2025 +0000
Commit:     Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Sun Dec  7 18:43:42 2025 +0000
URL:        https://gitweb.gentoo.org/proj/steve.git/commit/?id=b8d87a92

Support reserving tokens for poll clients

Reserve tokens for poll clients, to ensure that they can get one even
if there are blocking reads queued.  Otherwise, blocking reads could
eat all the available tokens before the poller manages to actually read.

Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>

 steve.cxx | 70 +++++++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 53 insertions(+), 17 deletions(-)

diff --git a/steve.cxx b/steve.cxx
index c04e2f2..60bb3a5 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -74,6 +74,7 @@ struct steve_waiter {
 struct steve_process {
        int pid_fd{-1};
        ssize_t tokens_held{0};
+       bool token_reserved{false};
        std::unique_ptr<struct event, std::function<void(struct event*)>> event;
 
        ~steve_process() {
@@ -105,8 +106,12 @@ enum class steve_token_availability {
        load_exceeded,
 };
 
-static steve_token_availability steve_can_give_token(steve_state *state)
+static steve_token_availability steve_can_give_token(steve_state *state, 
uint64_t pid)
 {
+       /* if there is a token reserved, we give it immediately (even if load 
is exceeded now) */
+       if (state->processes[pid].token_reserved)
+               return steve_token_availability::available;
+
        if (state->tokens <= 0)
                return steve_token_availability::no_tokens;
        if (state->max_load_avg > 0) {
@@ -127,23 +132,52 @@ static steve_token_availability 
steve_can_give_token(steve_state *state)
 
 static void steve_give_token(steve_state *state, fuse_req_t req, uint64_t pid)
 {
+       if (state->processes[pid].token_reserved) {
+               state->processes[pid].tokens_held++;
+               state->processes[pid].token_reserved = false;
+               if (state->verbose)
+                       std::print(stderr, "Giving reserved token to PID {}, {} 
left, {} tokens held by process\n",
+                                       pid, state->tokens, 
state->processes[pid].tokens_held);
+               fuse_reply_buf(req, "+", 1);
+               return;
+       }
+
        state->tokens--;
        state->processes[pid].tokens_held++;
        if (state->verbose) {
                if (state->max_load_avg > 0)
-                       std::print(stderr, "Giving job token to PID {}, {} 
left, {} tokens held by process, load average = {:.3} (limit: {})\n",
+                       std::print(stderr, "Giving job token to PID {}, {} 
left, {} tokens held by process, token reserved: {}, load average = {:.3} 
(limit: {})\n",
+                                       pid, state->tokens, 
state->processes[pid].tokens_held, state->processes[pid].token_reserved, 
state->load_avg, state->max_load_avg);
+               else
+                       std::print(stderr, "Giving job token to PID {}, {} 
left, {} tokens held by process, token reserved: {}\n",
+                                       pid, state->tokens, 
state->processes[pid].tokens_held, state->processes[pid].token_reserved);
+       }
+       fuse_reply_buf(req, "+", 1);
+}
+
+static void steve_reserve_token(steve_state *state, uint64_t pid)
+{
+       if (state->processes[pid].token_reserved)
+               return;
+
+       state->tokens--;
+       state->processes[pid].token_reserved = true;
+       if (state->verbose) {
+               if (state->max_load_avg > 0)
+                       std::print(stderr, "Reserving job token for PID {}, {} 
left, {} tokens held by process, load average = {:.3} (limit: {})\n",
                                        pid, state->tokens, 
state->processes[pid].tokens_held, state->load_avg, state->max_load_avg);
                else
-                       std::print(stderr, "Giving job token to PID {}, {} 
left, {} tokens held by process\n",
+                       std::print(stderr, "Reserving job token for PID {}, {} 
left, {} tokens held by process\n",
                                        pid, state->tokens, 
state->processes[pid].tokens_held);
        }
-       fuse_reply_buf(req, "+", 1);
+
+       /* TODO: we need to handle expiring reservations if client doesn't read 
*/
 }
 
 static void steve_wake_waiters(steve_state *state)
 {
        for (auto it = state->waiters.begin(); it != state->waiters.end();) {
-               if (steve_can_give_token(state) != 
steve_token_availability::available)
+               if (steve_can_give_token(state, it->pid) != 
steve_token_availability::available)
                        break;
 
                if (fuse_req_t *read_req = 
std::get_if<fuse_req_t>(&it->handle)) {
@@ -151,9 +185,9 @@ static void steve_wake_waiters(steve_state *state)
                        steve_give_token(state, *read_req, it->pid);
                } else if (fuse_pollhandle **poll_handle = 
std::get_if<fuse_pollhandle *>(&it->handle)) {
                        /* poll request */
+                       steve_reserve_token(state, it->pid);
                        if (state->verbose)
-                               std::print(stderr, "Notifying PID {} about 
POLLIN, {} tokens left, {} tokens held by process\n",
-                                               it->pid, state->tokens, 
state->processes[it->pid].tokens_held);
+                               std::print(stderr, "Notifying PID {} about 
POLLIN\n", it->pid);
                        fuse_lowlevel_notify_poll(*poll_handle);
                } else
                        assert(0 && "invalid waiter");
@@ -168,10 +202,12 @@ static void steve_handle_pidfd(evutil_socket_t pid_fd, 
short, void *userdata) {
        for (auto it = state->processes.begin(); it != state->processes.end(); 
++it) {
                if (it->second.pid_fd == pid_fd) {
                        state->tokens += it->second.tokens_held;
+                       if (it->second.token_reserved)
+                               ++state->tokens;
                        if (state->verbose || it->second.tokens_held > 0) {
-                               std::print(stderr, "Process {} exited while 
holding {} tokens, "
+                               std::print(stderr, "Process {} exited while 
holding {} tokens, token reserved: {}, "
                                                "{} tokens available after 
returning them\n",
-                                               it->first, 
it->second.tokens_held, state->tokens);
+                                               it->first, 
it->second.tokens_held, it->second.token_reserved, state->tokens);
                        }
                        state->processes.erase(it);
                        steve_wake_waiters(state);
@@ -312,7 +348,7 @@ static void steve_read(
        }
 
        /* no need to support reading more than one token at a time */
-       steve_token_availability token_avail = steve_can_give_token(state);
+       steve_token_availability token_avail = steve_can_give_token(state, 
fi->fh);
        if (token_avail == steve_token_availability::available) {
                steve_give_token(state, req, fi->fh);
                return;
@@ -327,12 +363,12 @@ static void steve_read(
        if (state->verbose) {
                if (token_avail == steve_token_availability::load_exceeded) {
                        std::print(stderr, "Load exceeded while PID {} 
requested token, waiting, {} tokens free, "
-                                       "{} tokens held by process, load 
average {:.3} >= {}\n",
-                                       fi->fh, state->tokens, 
state->processes[fi->fh].tokens_held,
+                                       "{} tokens held by process, token 
reserved: {}, load average {:.3} >= {}\n",
+                                       fi->fh, state->tokens, 
state->processes[fi->fh].tokens_held, state->processes[fi->fh].token_reserved,
                                        state->load_avg, state->max_load_avg);
                } else
-                       std::print(stderr, "No free job token for PID {}, 
waiting, {} tokens held by process\n",
-                                       fi->fh, 
state->processes[fi->fh].tokens_held);
+                       std::print(stderr, "No free job token for PID {}, 
waiting, {} tokens held by process, token reserved: {}\n",
+                                       fi->fh, 
state->processes[fi->fh].tokens_held, state->processes[fi->fh].token_reserved);
        }
        fuse_req_interrupt_func(req, steve_interrupt, state);
 }
@@ -374,8 +410,8 @@ static void steve_write(
        state->tokens += size;
        state->processes[fi->fh].tokens_held -= size;
        if (state->verbose)
-               std::print(stderr, "PID {} returned {} tokens, {} available 
now, {} tokens held by process\n",
-                               fi->fh, size, state->tokens, 
state->processes[fi->fh].tokens_held);
+               std::print(stderr, "PID {} returned {} tokens, {} available 
now, {} tokens held by process, token reserved: {}\n",
+                               fi->fh, size, state->tokens, 
state->processes[fi->fh].tokens_held, state->processes[fi->fh].token_reserved);
        fuse_reply_write(req, size);
 
        /* Since we have jobs now, see if anyone's waiting */
@@ -389,7 +425,7 @@ static void steve_poll(
        int events = fi->poll_events & (POLLIN | POLLOUT);
 
        /* POLLOUT is always possible, POLLIN only if we have any tokens */
-       steve_token_availability token_avail = steve_can_give_token(state);
+       steve_token_availability token_avail = steve_can_give_token(state, 
fi->fh);
        if (token_avail != steve_token_availability::available) {
                state->waiters.emplace_back(ph, fi->fh);
                events &= ~POLLIN;

Reply via email to