commit: b50a466eae81adab400a137428144a6bba085220
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Wed Dec 24 19:50:48 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Wed Dec 24 19:50:48 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=b50a466e
Initial support for limiting memory usage
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
src/steve.cxx | 176 ++++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 134 insertions(+), 42 deletions(-)
diff --git a/src/steve.cxx b/src/steve.cxx
index a9239ba..b357ba0 100644
--- a/src/steve.cxx
+++ b/src/steve.cxx
@@ -101,6 +101,14 @@ struct steve_process {
}
};
+enum class steve_token_availability {
+ available,
+ no_tokens,
+ load_exceeded,
+ per_process_limit_exceeded,
+ memory_use_exceeded,
+};
+
struct steve_state {
int retval{0};
const char *dev_name{"steve"};
@@ -108,28 +116,24 @@ struct steve_state {
int64_t jobs;
int64_t min_jobs{1};
int64_t per_process_limit;
- double max_load_avg{-1}; /* < 0 implies no load average */
- double load_avg;
+ double max_load_avg{-1}; /* <= 0 implies no load average */
+ double load_avg{-1};
+ int64_t min_memory_avail{-1}; /* <= 0 implies none */
+ int64_t memory_avail{-1};
int64_t tokens;
std::deque<steve_waiter> waiters;
std::unordered_map<uint64_t, steve_process> processes;
struct event_base *evb;
int loadavg_fd{-2};
+ int meminfo_fd{-2};
struct timeval recheck_timeout{0, 500000};
- bool recheck_triggered{false};
+ std::optional<steve_token_availability> recheck_triggered;
event_ptr recheck_event{nullptr, event_free};
struct fuse_session *session;
/* keep a global buffer as an optimization */
struct fuse_buf buf{};
};
-enum class steve_token_availability {
- available,
- no_tokens,
- load_exceeded,
- per_process_limit_exceeded,
-};
-
static void steve_get_load(steve_state *state)
{
if (state->loadavg_fd == -2) {
@@ -181,6 +185,48 @@ static void steve_get_load(steve_state *state)
}
}
+static void steve_get_memory_use(steve_state *state)
+{
+ if (state->meminfo_fd == -2) {
+ state->meminfo_fd = open("/proc/meminfo", O_RDONLY);
+ if (state->meminfo_fd == -1)
+ perror("Unable to open /proc/meminfo, memory use will
not be available");
+ }
+
+ if (state->meminfo_fd != -1) {
+ char buf[4096] = {"\n"};
+ ssize_t rd = pread(state->meminfo_fd, &buf[1], sizeof(buf) - 2,
0);
+
+ if (rd >= 0) {
+ buf[rd + 1] = 0;
+
+ constexpr char mem_avail_label[] = "\nMemAvailable:";
+ char *mem_avail = strstr(buf, mem_avail_label);
+ if (mem_avail) {
+ mem_avail += sizeof(mem_avail_label) - 1;
+ mem_avail += strspn(mem_avail, " ");
+ char *end = mem_avail + strspn(mem_avail,
"0123456789");
+ if (!strncmp(end, " kB\n", 4)) {
+ *end = 0;
+ long val;
+ if (arg_to_long(mem_avail, &val)) {
+ state->memory_avail = val /
1024;
+ return;
+ }
+ }
+ }
+
+ std::print(stderr, "Parsing /proc/meminfo failed\n");
+ } else
+ perror("Reading /proc/meminfo failed, memory use will
not be available");
+
+ close(state->meminfo_fd);
+ state->meminfo_fd = -1;
+ }
+
+ state->memory_avail = -1;
+}
+
static steve_token_availability steve_can_give_token(steve_state *state,
uint64_t pid)
{
/* if there is a token reserved, we give it immediately (even if load
is exceeded now) */
@@ -208,11 +254,24 @@ static steve_token_availability
steve_can_give_token(steve_state *state, uint64_
if (evtimer_add(state->recheck_event.get(),
&state->recheck_timeout) == -1)
std::print(stderr, "failed to enable recheck
timer\n");
else
- state->recheck_triggered = true;
+ state->recheck_triggered =
steve_token_availability::load_exceeded;
return steve_token_availability::load_exceeded;
}
}
+ if (state->min_memory_avail > 0) {
+ steve_get_memory_use(state);
+ if (state->memory_avail < state->min_memory_avail) {
+ /* trigger a recheck if we don't have one now */
+ assert(!state->recheck_triggered);
+ if (evtimer_add(state->recheck_event.get(),
&state->recheck_timeout) == -1)
+ std::print(stderr, "failed to enable recheck
timer\n");
+ else
+ state->recheck_triggered =
steve_token_availability::memory_use_exceeded;
+
+ return steve_token_availability::memory_use_exceeded;
+ }
+ }
return steve_token_availability::available;
}
@@ -259,16 +318,21 @@ static std::string steve_token_stats(
const steve_state *state,
const steve_process *process,
bool include_reserved = true,
- bool include_load = true)
+ bool include_load_mem = true)
{
assert(state);
assert(process);
std::string ret = std::format("{} left, {}",
state->tokens, steve_process_token_stats(process,
include_reserved));
- if (include_load && state->max_load_avg > 0)
- ret += std::format(", load average = {:.3} (limit: {})",
- state->load_avg, state->max_load_avg);
+ if (include_load_mem) {
+ if (state->max_load_avg > 0)
+ ret += std::format(", load average = {:.3} (limit: {})",
+ state->load_avg, state->max_load_avg);
+ if (state->min_memory_avail > 0)
+ ret += std::format(", memory available = {} MiB (min:
{} MiB)",
+ state->memory_avail, state->min_memory_avail);
+ }
return ret;
}
@@ -407,11 +471,13 @@ static void steve_init(void *userdata, struct
fuse_conn_info *)
state->tokens = state->jobs;
std::print(stderr, "steve running on /dev/{} for {} jobs\n",
state->dev_name, state->jobs);
- if (state->max_load_avg > 0) {
+ if (state->max_load_avg > 0)
std::print(stderr, " tokens will be served with load average <
{:.3}\n", state->max_load_avg);
+ if (state->min_memory_avail > 0)
+ std::print(stderr, " tokens will be served with memory
available >= {} MiB\n", state->min_memory_avail);
+ if (state->max_load_avg > 0 || state->min_memory_avail > 0)
std::print(stderr, " with a recheck timeout of {} s {} us\n",
state->recheck_timeout.tv_sec,
state->recheck_timeout.tv_usec);
- }
if (state->min_jobs > 0)
std::print(stderr, " at least {} jobs will be always
available\n", state->min_jobs);
if (state->per_process_limit > 0)
@@ -529,6 +595,41 @@ static void steve_interrupt(fuse_req_t req, void *userdata)
}
}
+static void steve_explain_no_token(
+ steve_token_availability token_avail,
+ const steve_state *state,
+ uint64_t pid,
+ const steve_process *process)
+{
+ switch (token_avail) {
+ case steve_token_availability::load_exceeded:
+ std::print(stderr, "Load exceeded while {} requested
token, waiting, {} tokens free, "
+ "{}, load average {:.3} >= {}\n",
+ steve_process_id(pid, process),
state->tokens,
+ steve_process_token_stats(process,
false),
+ state->load_avg, state->max_load_avg);
+ break;
+ case steve_token_availability::memory_use_exceeded:
+ std::print(stderr, "Memory use exceeded while {}
requested token, waiting, {} tokens free, "
+ "{}, memory available: {} MiB < {}
MiB\n",
+ steve_process_id(pid, process),
state->tokens,
+ steve_process_token_stats(process,
false),
+ state->memory_avail,
state->min_memory_avail);
+ break;
+ case steve_token_availability::per_process_limit_exceeded:
+ std::print(stderr, "{} exceeded per-process token
limit, waiting, {} tokens free\n",
+ steve_process_id(pid, process),
state->tokens);
+ break;
+ case steve_token_availability::no_tokens:
+ std::print(stderr, "No free job token for {}, waiting,
{}\n",
+ steve_process_id(pid, process),
steve_process_token_stats(process, false));
+ break;
+ case steve_token_availability::available:
+ assert(0 && "not reached");
+ break;
+ }
+}
+
static void steve_read(
fuse_req_t req, size_t size, off_t off, struct fuse_file_info *fi)
{
@@ -556,18 +657,8 @@ static void steve_read(
}
state->waiters.emplace_back(steve_waiter{req, fi->fh});
- if (state->verbose) {
- steve_process *process = &state->processes.at(fi->fh);
- if (token_avail == steve_token_availability::load_exceeded) {
- std::print(stderr, "Load exceeded while {} requested
token, waiting, {} tokens free, "
- "{}, load average {:.3} >= {}\n",
- steve_process_id(fi->fh, process),
state->tokens,
- steve_process_token_stats(process,
false),
- state->load_avg, state->max_load_avg);
- } else
- std::print(stderr, "No free job token for {}, waiting,
{}\n",
- steve_process_id(fi->fh, process),
steve_process_token_stats(process, false));
- }
+ if (state->verbose)
+ steve_explain_no_token(token_avail, state, fi->fh,
&state->processes.at(fi->fh));
fuse_req_interrupt_func(req, steve_interrupt, state);
}
@@ -679,19 +770,13 @@ static void steve_poll(
}
if (state->verbose) {
- steve_process *process = &state->processes.at(fi->fh);
- if (token_avail == steve_token_availability::load_exceeded) {
- assert(state->max_load_avg > 0);
- /* capped by load average */
- std::print(stderr, "Load exceeded while {} requested
token, waiting, {} tokens free, "
- "{}, load average {:.3} >= {}\n",
- steve_process_id(fi->fh, process),
state->tokens,
- steve_process_token_stats(process,
false),
- state->load_avg, state->max_load_avg);
- } else
+ const steve_process *process = &state->processes.at(fi->fh);
+ if (token_avail == steve_token_availability::available)
std::print(stderr, "{} requested poll, {} tokens
available, {}\n",
steve_process_id(fi->fh, process),
state->tokens,
steve_process_token_stats(process,
false));
+ else
+ steve_explain_no_token(token_avail, state, fi->fh,
process);
}
fuse_reply_poll(req, events);
@@ -876,7 +961,7 @@ static void steve_handle_cuse(evutil_socket_t, short, void
*userdata) {
static void steve_handle_recheck(evutil_socket_t, short, void *userdata) {
steve_state *state = static_cast<steve_state *>(userdata);
- state->recheck_triggered = false;
+ state->recheck_triggered.reset();
steve_wake_waiters(state);
}
@@ -940,6 +1025,9 @@ static constexpr char steve_usage[] =
" --load-recheck-timeout=TIMEOUT, -r TIMEOUT\n"
" timeout for throttling due to exceeded load, in
sec\n"
" (fractional down to usec, default: 0.5)\n"
+" --min-memory=MIN_MEM, -M MIN_MEM\n"
+" do not serve tokens available memory is above\n"
+" MIN_MEM, in MiB\n"
" --min-jobs=MIN_JOBS, -m MIN_JOBS\n"
" min. jobs to serve even if load average is
exceeded\n"
" (default: 1)\n"
@@ -962,6 +1050,7 @@ static const struct option steve_long_opts[] = {
{ "jobs", required_argument, 0, 'j' },
{ "load-average", required_argument, 0, 'l' },
{ "load-recheck-timeout", required_argument, 0, 'r' },
+ { "min-memory", required_argument, 0, 'M' },
{ "min-jobs", required_argument, 0, 'm' },
{ "per-process-limit", required_argument, 0, 'p' },
{ "dev-name", required_argument, 0, steve_long_option::dev_name },
@@ -971,7 +1060,7 @@ static const struct option steve_long_opts[] = {
{},
};
-static const char *steve_short_opts = "hVj:l:r:m:p:u:vd";
+static const char *steve_short_opts = "hVj:l:r:m:M:p:u:vd";
int main(int argc, char **argv)
{
@@ -990,17 +1079,20 @@ int main(int argc, char **argv)
return 0;
case 'j':
case 'm':
+ case 'M':
case 'p':
{
long jobs_arg;
if (!arg_to_long(optarg, &jobs_arg)) {
- std::print(stderr, "invalid job
number: {}\n", optarg);
+ std::print(stderr, "invalid
argument: {}\n", optarg);
return 1;
}
if (opt == 'j')
state.jobs = jobs_arg;
else if (opt == 'm')
state.min_jobs = jobs_arg;
+ else if (opt == 'M')
+ state.min_memory_avail =
jobs_arg;
else if (opt == 'p')
state.per_process_limit =
jobs_arg;
else