commit:     b2faaf8cba3617b6d2f182f6b67fa133ef7f9fa2
Author:     Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Thu Dec 18 09:51:21 2025 +0000
Commit:     Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Thu Dec 18 09:51:21 2025 +0000
URL:        https://gitweb.gentoo.org/proj/steve.git/commit/?id=b2faaf8c

Use unique tokens to track specific jobs and report their runtimes

Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>

 steve.cxx | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 80 insertions(+), 16 deletions(-)

diff --git a/steve.cxx b/steve.cxx
index ae5013b..cef98d8 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -11,6 +11,7 @@
 #define FUSE_USE_VERSION 31
 
 #include <cassert>
+#include <chrono>
 #include <cstddef>
 #include <cstdio>
 #include <cstdlib>
@@ -76,7 +77,12 @@ struct steve_waiter {
        }
 };
 
+struct steve_job_info {
+       std::chrono::time_point<std::chrono::steady_clock> start_time;
+};
+
 typedef std::unique_ptr<struct event, std::function<void(struct event*)>> 
event_ptr;
+typedef std::unordered_map<uint64_t, steve_job_info> running_job_map;
 
 struct steve_process {
        int pid_fd{-1};
@@ -84,6 +90,7 @@ struct steve_process {
        bool token_reserved{false};
        event_ptr pidfd_event;
        std::optional<char> extra_token;
+       running_job_map running_jobs;
 
        ~steve_process() {
                if (pid_fd != -1)
@@ -211,13 +218,22 @@ static steve_token_availability 
steve_can_give_token(steve_state *state, uint64_
 
 static char steve_get_token_char(steve_process *process)
 {
+       int64_t job_num;
+
        if (process->extra_token.has_value()) {
-               char ret = process->extra_token.value();
+               assert(process->running_jobs.empty());
+               job_num = process->extra_token.value();
                process->extra_token.reset();
-               return ret;
+       } else {
+               /* find first free token */
+               /* TODO: can we optimize this? */
+               for (job_num = 0; process->running_jobs.contains(job_num); 
++job_num);
        }
 
-       return '+';
+       process->running_jobs.emplace(
+               job_num, steve_job_info{std::chrono::steady_clock::now()}
+       );
+       return job_num & 0xFF;
 }
 
 static void steve_give_token(steve_state *state, fuse_req_t req, uint64_t pid)
@@ -228,7 +244,7 @@ static void steve_give_token(steve_state *state, fuse_req_t 
req, uint64_t pid)
                state->processes[pid].tokens_held++;
                state->processes[pid].token_reserved = false;
                if (state->verbose)
-                       std::print(stderr, "Giving reserved token 0x{:2x} to 
PID {}, {} left, {} tokens held by process\n",
+                       std::print(stderr, "Giving reserved token 0x{:02x} to 
PID {}, {} left, {} tokens held by process\n",
                                        token, pid, state->tokens, 
state->processes[pid].tokens_held);
                fuse_reply_buf(req, &token, 1);
                return;
@@ -238,10 +254,10 @@ static void steve_give_token(steve_state *state, 
fuse_req_t req, uint64_t pid)
        state->processes[pid].tokens_held++;
        if (state->verbose) {
                if (state->max_load_avg > 0)
-                       std::print(stderr, "Giving job token 0x{:2x} to PID {}, 
{} left, {} tokens held by process, token reserved: {}, load average = {:.3} 
(limit: {})\n",
+                       std::print(stderr, "Giving job token 0x{:02x} to PID 
{}, {} left, {} tokens held by process, token reserved: {}, load average = 
{:.3} (limit: {})\n",
                                        token, pid, state->tokens, 
state->processes[pid].tokens_held, state->processes[pid].token_reserved, 
state->load_avg, state->max_load_avg);
                else
-                       std::print(stderr, "Giving job token 0x{:2x} to PID {}, 
{} left, {} tokens held by process, token reserved: {}\n",
+                       std::print(stderr, "Giving job token 0x{:02x} to PID 
{}, {} left, {} tokens held by process, token reserved: {}\n",
                                        token, pid, state->tokens, 
state->processes[pid].tokens_held, state->processes[pid].token_reserved);
        }
        fuse_reply_buf(req, &token, 1);
@@ -510,6 +526,29 @@ static void steve_read(
        fuse_req_interrupt_func(req, steve_interrupt, state);
 }
 
+static running_job_map::iterator steve_find_running_job(steve_process 
*process, char token)
+{
+       running_job_map::iterator ret = process->running_jobs.end();
+
+       /* Find the first matching token */
+       /* TODO: optimize this */
+       for (auto it = process->running_jobs.begin(); it != 
process->running_jobs.end(); ++it) {
+               if (static_cast<uint8_t>(it->first & 0xFF) == token) {
+                       if (ret == process->running_jobs.end() || 
it->second.start_time < ret->second.start_time)
+                               ret = it;
+               }
+       }
+       if (ret != process->running_jobs.end())
+               return ret;
+
+       /* No matching token? We're dealing with a bad client, just return the 
oldest token. */
+       for (auto it = process->running_jobs.begin(); it != 
process->running_jobs.end(); ++it) {
+               if (ret == process->running_jobs.end() || it->second.start_time 
< ret->second.start_time)
+                       ret = it;
+       }
+       return ret;
+}
+
 static void steve_write(
        fuse_req_t req, const char *data, size_t size, off_t off,
        struct fuse_file_info *fi)
@@ -530,16 +569,36 @@ static void steve_write(
        /* workaround for https://github.com/medek/nasm-rs/issues/44 */
        if (state->processes[fi->fh].tokens_held == 0 && size == 1) {
                assert(!state->processes[fi->fh].extra_token.has_value());
+               assert(state->processes[fi->fh].running_jobs.empty());
                state->processes[fi->fh].extra_token = data[0];
-               std::print(stderr, "Warning: process {} pre-released an 
unacquired token 0x{:2x}, please report a bug upstream\n",
+               std::print(stderr, "Warning: process {} pre-released an 
unacquired token 0x{:02x}, please report a bug upstream\n",
                                fi->fh, data[0]);
-       } else if (state->processes[fi->fh].tokens_held < 
static_cast<ssize_t>(size)) {
-               std::print(stderr, "Warning: process {} tried to return {} 
tokens while holding only {} tokens, capping\n",
-                               fi->fh, size, 
state->processes[fi->fh].tokens_held);
-               if (state->processes[fi->fh].tokens_held < 0)
-                       size = 0;
-               else
-                       size = state->processes[fi->fh].tokens_held;
+       } else {
+               if (state->processes[fi->fh].tokens_held < 
static_cast<ssize_t>(size)) {
+                       std::print(stderr, "Warning: process {} tried to return 
{} tokens while holding only {} tokens, capping\n",
+                                       fi->fh, size, 
state->processes[fi->fh].tokens_held);
+                       if (state->processes[fi->fh].tokens_held < 0)
+                               size = 0;
+                       else
+                               size = state->processes[fi->fh].tokens_held;
+               }
+
+               /* Finish the running jobs */
+               std::chrono::time_point<std::chrono::steady_clock> current_time 
=
+                       std::chrono::steady_clock::now();
+               for (const char *token = data; token < data + size; ++token) {
+                       auto it = 
steve_find_running_job(&state->processes[fi->fh], *token);
+
+                       if (static_cast<uint8_t>(it->first & 0xFF) != *token) {
+                               std::print(stderr, "Warning: process {} 
returned incorrect token value 0x{:02x}, please report a bug upstream\n",
+                                               fi->fh, *token);
+                       }
+                       if (state->verbose)
+                               std::print(stderr, "PID {} job 0x{:02x} 
finished after {}\n", fi->fh, it->first,
+                                       
std::chrono::duration<double>(current_time - it->second.start_time));
+
+                       state->processes[fi->fh].running_jobs.erase(it);
+               }
        }
        if (size == 0) {
                fuse_reply_err(req, ENOSPC);
@@ -730,11 +789,16 @@ static const struct cuse_lowlevel_ops steve_ops = {
 
 static void steve_handle_sigusr1(evutil_socket_t, short, void *userdata) {
        steve_state *state = static_cast<steve_state *>(userdata);
+       std::chrono::time_point<std::chrono::steady_clock> current_time =
+               std::chrono::steady_clock::now();
 
        std::print(stderr, "steve: currently {} tokens available out of {}\n",
                        state->tokens, state->jobs);
-       for (auto &it : state->processes) {
-               std::print(stderr, "PID {} holds {} tokens\n", it.first, 
it.second.tokens_held);
+       for (const auto &it : state->processes) {
+               std::print(stderr, "PID {} holds {} tokens:\n", it.first, 
it.second.tokens_held);
+               for (const auto &jt : it.second.running_jobs)
+                       std::print(stderr, "  job 0x{:02x} running for {}\n", 
jt.first,
+                               std::chrono::duration<double>(current_time - 
jt.second.start_time));
        }
 }
 

Reply via email to