commit: 41b4077e9950cdd6f79b57b463788468530a7fc0
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Fri Nov 28 18:58:49 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Fri Nov 28 18:58:49 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=41b4077e
Initial support for load-average limiting
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
steve.cxx | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 84 insertions(+), 16 deletions(-)
diff --git a/steve.cxx b/steve.cxx
index a6ed034..50d9f50 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -68,6 +68,8 @@ struct steve_process {
struct steve_state {
bool verbose;
size_t jobs;
+ double max_load_avg{-1}; /* < 0 implies no load average */
+ double load_avg;
size_t tokens;
std::deque<steve_read_waiter> read_waiters;
std::deque<steve_poll_waiter> poll_waiters;
@@ -80,25 +82,54 @@ struct steve_state {
struct fuse_buf buf{};
};
+enum class steve_token_availability {
+ available,
+ no_tokens,
+ load_exceeded,
+};
+
+static steve_token_availability steve_can_give_token(steve_state *state)
+{
+ if (state->tokens == 0)
+ return steve_token_availability::no_tokens;
+ if (state->max_load_avg > 0) {
+ if (getloadavg(&state->load_avg, 1) == -1) {
+ static bool warned = false;
+ if (!warned) {
+ perror("getloadavg() failed, will ignore
(further warnings will be suppressed)");
+ warned = true;
+ }
+ } else if (state->load_avg > state->max_load_avg)
+ return steve_token_availability::load_exceeded;
+ }
+
+ return steve_token_availability::available;
+}
+
static void steve_give_token(steve_state *state, fuse_req_t req, uint64_t pid)
{
state->tokens--;
state->processes[pid].tokens_held++;
- if (state->verbose)
- std::print(stderr, "Giving job token to PID {}, {} left, {}
tokens held by process\n",
- pid, state->tokens,
state->processes[pid].tokens_held);
+ if (state->verbose) {
+ if (state->max_load_avg > 0)
+ std::print(stderr, "Giving job token to PID {}, {}
left, {} tokens held by process, load average = {:.3} < {}\n",
+ pid, state->tokens,
state->processes[pid].tokens_held, state->load_avg, state->max_load_avg);
+ else
+ std::print(stderr, "Giving job token to PID {}, {}
left, {} tokens held by process\n",
+ pid, state->tokens,
state->processes[pid].tokens_held);
+ }
fuse_reply_buf(req, "+", 1);
}
static void steve_wake_waiters(steve_state *state)
{
- while (state->tokens > 0 && !state->read_waiters.empty()) {
+ while (!state->read_waiters.empty() && steve_can_give_token(state) ==
steve_token_availability::available) {
const steve_read_waiter *read_waiter =
&state->read_waiters.front();
steve_give_token(state, read_waiter->req, read_waiter->pid);
state->read_waiters.pop_front();
}
- if (state->tokens > 0) {
+ if (steve_can_give_token(state) == steve_token_availability::available)
{
for (auto &poll_waiter : state->poll_waiters) {
if (state->verbose)
std::print(stderr, "Notifying PID {} about
POLLIN, {} tokens left, {} tokens held by process\n",
@@ -136,6 +167,8 @@ static void steve_init(void *userdata, struct
fuse_conn_info *)
state->tokens = state->jobs;
std::print(stderr, "steve running on /dev/steve for {} jobs\n",
state->jobs);
+ if (state->max_load_avg > 0)
+ std::print(stderr, " tokens will be served with load average <
{:.3}\n", state->max_load_avg);
}
static void steve_destroy(void *userdata)
@@ -256,7 +289,8 @@ static void steve_read(
}
/* no need to support reading more than one token at a time */
- if (state->tokens > 0) {
+ steve_token_availability token_avail = steve_can_give_token(state);
+ if (token_avail == steve_token_availability::available) {
steve_give_token(state, req, fi->fh);
return;
}
@@ -267,9 +301,16 @@ static void steve_read(
}
state->read_waiters.emplace_back(steve_read_waiter{req, fi->fh});
- if (state->verbose)
- std::print(stderr, "No free job token for PID {}, waiting, {}
tokens held by process\n",
- fi->fh, state->processes[fi->fh].tokens_held);
+ if (state->verbose) {
+ if (token_avail == steve_token_availability::load_exceeded) {
+ std::print(stderr, "Load exceeded while PID {}
requested token, waiting, {} tokens free, "
+ "{} tokens held by process, load
average {:.3} >= {}\n",
+ fi->fh, state->tokens,
state->processes[fi->fh].tokens_held,
+ state->load_avg, state->max_load_avg);
+ } else
+ std::print(stderr, "No free job token for PID {},
waiting, {} tokens held by process\n",
+ fi->fh,
state->processes[fi->fh].tokens_held);
+ }
fuse_req_interrupt_func(req, steve_interrupt, state);
}
@@ -324,16 +365,26 @@ static void steve_poll(
steve_state *state = static_cast<steve_state *>(fuse_req_userdata(req));
int events = fi->poll_events & (POLLIN | POLLOUT);
- if (state->verbose)
- std::print(stderr, "PID {} requested poll, {} tokens available,
{} tokens held by process\n",
- fi->fh, state->tokens,
state->processes[fi->fh].tokens_held);
-
/* POLLOUT is always possible, POLLIN only if we have any tokens */
- if (state->tokens == 0) {
+ steve_token_availability token_avail = steve_can_give_token(state);
+ if (token_avail != steve_token_availability::available) {
state->poll_waiters.emplace_back(ph, fi->fh);
events &= ~POLLIN;
}
+ if (state->verbose) {
+ if (token_avail == steve_token_availability::load_exceeded) {
+ assert(state->max_load_avg > 0);
+ /* capped by load average */
+ std::print(stderr, "Load exceeded while PID {}
requested token, waiting, {} tokens free, "
+ "{} tokens held by process, load
average {:.3} >= {}\n",
+ fi->fh, state->tokens,
state->processes[fi->fh].tokens_held,
+ state->load_avg, state->max_load_avg);
+ } else
+ std::print(stderr, "PID {} requested poll, {} tokens
available, {} tokens held by process\n",
+ fi->fh, state->tokens,
state->processes[fi->fh].tokens_held);
+ }
+
fuse_reply_poll(req, events);
}
@@ -374,18 +425,23 @@ static constexpr char steve_usage[] =
" --help, -h print this help message\n"
" --version, -V print version\n"
" --jobs=JOBS, -j JOBS jobs to use (default: nproc)\n"
+" --load-average=LOAD_AVG, -l LOAD_AVG\n"
+" do not serve tokens unless load is below
LOAD_AVG\n"
" --verbose, -v enable verbose logging\n"
" --debug, -d enable FUSE debug output\n";
-static const struct option steve_opts[] = {
+static const struct option steve_long_opts[] = {
{ "help", no_argument, 0, 'h' },
{ "version", no_argument, 0, 'V' },
{ "jobs", required_argument, 0, 'j' },
+ { "load-average", required_argument, 0, 'l' },
{ "verbose", no_argument, 0, 'v' },
{ "debug", no_argument, 0, 'd' },
{},
};
+static const char *steve_short_opts = "hVj:l:vd";
+
struct fd_guard {
int fd;
~fd_guard() { close(fd); }
@@ -397,7 +453,7 @@ int main(int argc, char **argv)
int opt;
bool debug = false;
- while ((opt = getopt_long(argc, argv, "hVj:vd", steve_opts, nullptr))
!= -1) {
+ while ((opt = getopt_long(argc, argv, steve_short_opts,
steve_long_opts, nullptr)) != -1) {
switch (opt) {
case 'h':
std::print(steve_usage, argv[0]);
@@ -417,6 +473,18 @@ int main(int argc, char **argv)
state.jobs = jobs_arg;
}
break;
+ case 'l':
+ {
+ char *endptr;
+ errno = 0;
+ double load_avg_arg = strtod(optarg,
&endptr);
+ if (*endptr || errno == ERANGE ||
load_avg_arg < 1) {
+ std::print(stderr, "invalid
load average value (must be >=1): {}\n", optarg);
+ return 1;
+ }
+ state.max_load_avg = load_avg_arg;
+ }
+ break;
case 'v':
state.verbose = true;
break;