commit: b2faaf8cba3617b6d2f182f6b67fa133ef7f9fa2
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Thu Dec 18 09:51:21 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Thu Dec 18 09:51:21 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=b2faaf8c
Use unique tokens to track specific jobs and report their runtimes
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
steve.cxx | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 80 insertions(+), 16 deletions(-)
diff --git a/steve.cxx b/steve.cxx
index ae5013b..cef98d8 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -11,6 +11,7 @@
#define FUSE_USE_VERSION 31
#include <cassert>
+#include <chrono>
#include <cstddef>
#include <cstdio>
#include <cstdlib>
@@ -76,7 +77,12 @@ struct steve_waiter {
}
};
+struct steve_job_info {
+ std::chrono::time_point<std::chrono::steady_clock> start_time;
+};
+
typedef std::unique_ptr<struct event, std::function<void(struct event*)>>
event_ptr;
+typedef std::unordered_map<uint64_t, steve_job_info> running_job_map;
struct steve_process {
int pid_fd{-1};
@@ -84,6 +90,7 @@ struct steve_process {
bool token_reserved{false};
event_ptr pidfd_event;
std::optional<char> extra_token;
+ running_job_map running_jobs;
~steve_process() {
if (pid_fd != -1)
@@ -211,13 +218,22 @@ static steve_token_availability
steve_can_give_token(steve_state *state, uint64_
static char steve_get_token_char(steve_process *process)
{
+ int64_t job_num;
+
if (process->extra_token.has_value()) {
- char ret = process->extra_token.value();
+ assert(process->running_jobs.empty());
+ job_num = process->extra_token.value();
process->extra_token.reset();
- return ret;
+ } else {
+ /* find first free token */
+ /* TODO: can we optimize this? */
+ for (job_num = 0; process->running_jobs.contains(job_num);
++job_num);
}
- return '+';
+ process->running_jobs.emplace(
+ job_num, steve_job_info{std::chrono::steady_clock::now()}
+ );
+ return job_num & 0xFF;
}
static void steve_give_token(steve_state *state, fuse_req_t req, uint64_t pid)
@@ -228,7 +244,7 @@ static void steve_give_token(steve_state *state, fuse_req_t
req, uint64_t pid)
state->processes[pid].tokens_held++;
state->processes[pid].token_reserved = false;
if (state->verbose)
- std::print(stderr, "Giving reserved token 0x{:2x} to
PID {}, {} left, {} tokens held by process\n",
+ std::print(stderr, "Giving reserved token 0x{:02x} to
PID {}, {} left, {} tokens held by process\n",
token, pid, state->tokens,
state->processes[pid].tokens_held);
fuse_reply_buf(req, &token, 1);
return;
@@ -238,10 +254,10 @@ static void steve_give_token(steve_state *state,
fuse_req_t req, uint64_t pid)
state->processes[pid].tokens_held++;
if (state->verbose) {
if (state->max_load_avg > 0)
- std::print(stderr, "Giving job token 0x{:2x} to PID {},
{} left, {} tokens held by process, token reserved: {}, load average = {:.3}
(limit: {})\n",
+ std::print(stderr, "Giving job token 0x{:02x} to PID
{}, {} left, {} tokens held by process, token reserved: {}, load average =
{:.3} (limit: {})\n",
token, pid, state->tokens,
state->processes[pid].tokens_held, state->processes[pid].token_reserved,
state->load_avg, state->max_load_avg);
else
- std::print(stderr, "Giving job token 0x{:2x} to PID {},
{} left, {} tokens held by process, token reserved: {}\n",
+ std::print(stderr, "Giving job token 0x{:02x} to PID
{}, {} left, {} tokens held by process, token reserved: {}\n",
token, pid, state->tokens,
state->processes[pid].tokens_held, state->processes[pid].token_reserved);
}
fuse_reply_buf(req, &token, 1);
@@ -510,6 +526,29 @@ static void steve_read(
fuse_req_interrupt_func(req, steve_interrupt, state);
}
+static running_job_map::iterator steve_find_running_job(steve_process
*process, char token)
+{
+ running_job_map::iterator ret = process->running_jobs.end();
+
+ /* Find the first matching token */
+ /* TODO: optimize this */
+ for (auto it = process->running_jobs.begin(); it !=
process->running_jobs.end(); ++it) {
+ if (static_cast<uint8_t>(it->first & 0xFF) == token) {
+ if (ret == process->running_jobs.end() ||
it->second.start_time < ret->second.start_time)
+ ret = it;
+ }
+ }
+ if (ret != process->running_jobs.end())
+ return ret;
+
+ /* No matching token? We're dealing with a bad client, just return the
oldest token. */
+ for (auto it = process->running_jobs.begin(); it !=
process->running_jobs.end(); ++it) {
+ if (ret == process->running_jobs.end() || it->second.start_time
< ret->second.start_time)
+ ret = it;
+ }
+ return ret;
+}
+
static void steve_write(
fuse_req_t req, const char *data, size_t size, off_t off,
struct fuse_file_info *fi)
@@ -530,16 +569,36 @@ static void steve_write(
/* workaround for https://github.com/medek/nasm-rs/issues/44 */
if (state->processes[fi->fh].tokens_held == 0 && size == 1) {
assert(!state->processes[fi->fh].extra_token.has_value());
+ assert(state->processes[fi->fh].running_jobs.empty());
state->processes[fi->fh].extra_token = data[0];
- std::print(stderr, "Warning: process {} pre-released an
unacquired token 0x{:2x}, please report a bug upstream\n",
+ std::print(stderr, "Warning: process {} pre-released an
unacquired token 0x{:02x}, please report a bug upstream\n",
fi->fh, data[0]);
- } else if (state->processes[fi->fh].tokens_held <
static_cast<ssize_t>(size)) {
- std::print(stderr, "Warning: process {} tried to return {}
tokens while holding only {} tokens, capping\n",
- fi->fh, size,
state->processes[fi->fh].tokens_held);
- if (state->processes[fi->fh].tokens_held < 0)
- size = 0;
- else
- size = state->processes[fi->fh].tokens_held;
+ } else {
+ if (state->processes[fi->fh].tokens_held <
static_cast<ssize_t>(size)) {
+ std::print(stderr, "Warning: process {} tried to return
{} tokens while holding only {} tokens, capping\n",
+ fi->fh, size,
state->processes[fi->fh].tokens_held);
+ if (state->processes[fi->fh].tokens_held < 0)
+ size = 0;
+ else
+ size = state->processes[fi->fh].tokens_held;
+ }
+
+ /* Finish the running jobs */
+ std::chrono::time_point<std::chrono::steady_clock> current_time
=
+ std::chrono::steady_clock::now();
+ for (const char *token = data; token < data + size; ++token) {
+ auto it =
steve_find_running_job(&state->processes[fi->fh], *token);
+
+ if (static_cast<uint8_t>(it->first & 0xFF) != *token) {
+ std::print(stderr, "Warning: process {}
returned incorrect token value 0x{:02x}, please report a bug upstream\n",
+ fi->fh, *token);
+ }
+ if (state->verbose)
+ std::print(stderr, "PID {} job 0x{:02x}
finished after {}\n", fi->fh, it->first,
+
std::chrono::duration<double>(current_time - it->second.start_time));
+
+ state->processes[fi->fh].running_jobs.erase(it);
+ }
}
if (size == 0) {
fuse_reply_err(req, ENOSPC);
@@ -730,11 +789,16 @@ static const struct cuse_lowlevel_ops steve_ops = {
static void steve_handle_sigusr1(evutil_socket_t, short, void *userdata) {
steve_state *state = static_cast<steve_state *>(userdata);
+ std::chrono::time_point<std::chrono::steady_clock> current_time =
+ std::chrono::steady_clock::now();
std::print(stderr, "steve: currently {} tokens available out of {}\n",
state->tokens, state->jobs);
- for (auto &it : state->processes) {
- std::print(stderr, "PID {} holds {} tokens\n", it.first,
it.second.tokens_held);
+ for (const auto &it : state->processes) {
+ std::print(stderr, "PID {} holds {} tokens:\n", it.first,
it.second.tokens_held);
+ for (const auto &jt : it.second.running_jobs)
+ std::print(stderr, " job 0x{:02x} running for {}\n",
jt.first,
+ std::chrono::duration<double>(current_time -
jt.second.start_time));
}
}