commit:     bd234d861abb0da8071d067d006a3429a47a1563
Author:     Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Sun Nov 16 20:00:19 2025 +0000
Commit:     Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Sun Nov 16 20:05:31 2025 +0000
URL:        https://gitweb.gentoo.org/proj/steve.git/commit/?id=bd234d86

Initial support for read waiters

Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>

 meson.build |  3 ++-
 steve.c     | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/meson.build b/meson.build
index 8f9e5da..cf63031 100644
--- a/meson.build
+++ b/meson.build
@@ -1,6 +1,7 @@
 project('steve', 'c')
 
 fuse3 = dependency('fuse3')
+libbsd = dependency('libbsd')
 
 executable('steve', ['steve.c'],
-           dependencies: fuse3)
+           dependencies: [fuse3, libbsd])

diff --git a/steve.c b/steve.c
index 3002016..20ae4af 100644
--- a/steve.c
+++ b/steve.c
@@ -17,6 +17,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/queue.h>
 #include <unistd.h>
 #include <errno.h>
 
@@ -32,10 +33,19 @@ static const char *usage =
 "    -s                    disable multi-threaded operation\n"
 "\n";
 
+struct steve_read_waiter {
+       fuse_req_t req;
+       pid_t pid;
+       SLIST_ENTRY(steve_read_waiter) waiters;
+};
+
+SLIST_HEAD(steve_read_waiters_head, steve_read_waiter);
+
 struct steve_state {
        bool verbose;
        unsigned int jobs;
        unsigned int tokens;
+       struct steve_read_waiters_head read_waiters;
 };
 
 enum steve_arg_keys {
@@ -85,20 +95,54 @@ static void steve_init(void *userdata, struct 
fuse_conn_info *conn)
        conn->no_interrupt = 1;
 
        state->tokens = state->jobs;
+       SLIST_INIT(&state->read_waiters);
 
        fprintf(stderr, "steve running on /dev/steve for %d jobs\n", 
state->jobs);
 }
 
+static void steve_destroy(void *userdata)
+{
+       struct steve_state *state = userdata;
+
+       while (!SLIST_EMPTY(&state->read_waiters)) {
+               struct steve_read_waiter *read_waiter = 
SLIST_FIRST(&state->read_waiters);
+               SLIST_REMOVE_HEAD(&state->read_waiters, waiters);
+               free(read_waiter);
+       }
+       SLIST_INIT(&state->read_waiters);
+}
+
 static void steve_open(fuse_req_t req, struct fuse_file_info *fi)
 {
        fuse_reply_open(req, fi);
 }
 
+static void steve_interrupt(fuse_req_t req, void *userdata)
+{
+       struct steve_state *state = userdata;
+       struct steve_read_waiter *it, *read_waiter;
+
+       fuse_reply_err(req, EINTR);
+       SLIST_FOREACH(it, &state->read_waiters, waiters) {
+               if (it->req == req) {
+                       read_waiter = it;
+                       break;
+               }
+       }
+       /* TODO: can this ever trigger? */
+       if (read_waiter) {
+               if (state->verbose)
+                       printf("Passed EINTR to PID %d\n", read_waiter->pid);
+               SLIST_REMOVE(&state->read_waiters, read_waiter, 
steve_read_waiter, waiters);
+       }
+}
+
 static void steve_read(
        fuse_req_t req, size_t size, off_t off, struct fuse_file_info *fi)
 {
        const struct fuse_ctx *context = fuse_req_ctx(req);
        struct steve_state *state = fuse_req_userdata(req);
+       struct steve_read_waiter *read_waiter;
 
        if (off != 0) {
                fuse_reply_err(req, EIO);
@@ -124,8 +168,34 @@ static void steve_read(
                return;
        }
 
-       /* TODO: implement waiting */
-       fuse_reply_err(req, EIO);
+       read_waiter = malloc(sizeof(struct steve_read_waiter));
+       if (!read_waiter) {
+               fuse_reply_err(req, ENOMEM);
+               return;
+       }
+
+       read_waiter->req = req;
+       read_waiter->pid = context->pid;
+       /* TODO: append to end to make it FIFO */
+       SLIST_INSERT_HEAD(&state->read_waiters, read_waiter, waiters);
+       if (state->verbose)
+               printf("No free job token for PID %d, waiting\n", context->pid);
+       fuse_req_interrupt_func(req, steve_interrupt, state);
+}
+
+static void steve_wake_waiters(struct steve_state *state)
+{
+       while (state->tokens > 0 && !SLIST_EMPTY(&state->read_waiters)) {
+               struct steve_read_waiter *read_waiter = SLIST_FIRST(
+                               &state->read_waiters);
+
+               state->tokens--;
+               if (state->verbose)
+                       printf("Giving job token to PID %d, %d left\n",
+                                       read_waiter->pid, state->tokens);
+               fuse_reply_buf(read_waiter->req, "+", 1);
+               SLIST_REMOVE_HEAD(&state->read_waiters, waiters);
+       }
 }
 
 static void steve_write(
@@ -145,10 +215,14 @@ static void steve_write(
                printf("PID %d returned %zd tokens, %d available now\n",
                                context->pid, size, state->tokens);
        fuse_reply_write(req, size);
+
+       /* Since we have jobs now, see if anyone's waiting */
+       steve_wake_waiters(state);
 }
 
 static const struct cuse_lowlevel_ops steve_ops = {
        .init = steve_init,
+       .destroy = steve_destroy,
        .open = steve_open,
        .read = steve_read,
        .write = steve_write,

Reply via email to