commit:     b50a466eae81adab400a137428144a6bba085220
Author:     Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Wed Dec 24 19:50:48 2025 +0000
Commit:     Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Wed Dec 24 19:50:48 2025 +0000
URL:        https://gitweb.gentoo.org/proj/steve.git/commit/?id=b50a466e

Initial support for limiting memory usage

Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>

 src/steve.cxx | 176 ++++++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 134 insertions(+), 42 deletions(-)

diff --git a/src/steve.cxx b/src/steve.cxx
index a9239ba..b357ba0 100644
--- a/src/steve.cxx
+++ b/src/steve.cxx
@@ -101,6 +101,14 @@ struct steve_process {
        }
 };
 
+enum class steve_token_availability {
+       available,
+       no_tokens,
+       load_exceeded,
+       per_process_limit_exceeded,
+       memory_use_exceeded,
+};
+
 struct steve_state {
        int retval{0};
        const char *dev_name{"steve"};
@@ -108,28 +116,24 @@ struct steve_state {
        int64_t jobs;
        int64_t min_jobs{1};
        int64_t per_process_limit;
-       double max_load_avg{-1};  /* < 0 implies no load average */
-       double load_avg;
+       double max_load_avg{-1};  /* <= 0 implies no load average */
+       double load_avg{-1};
+       int64_t min_memory_avail{-1}; /* <= 0 implies none */
+       int64_t memory_avail{-1};
        int64_t tokens;
        std::deque<steve_waiter> waiters;
        std::unordered_map<uint64_t, steve_process> processes;
        struct event_base *evb;
        int loadavg_fd{-2};
+       int meminfo_fd{-2};
        struct timeval recheck_timeout{0, 500000};
-       bool recheck_triggered{false};
+       std::optional<steve_token_availability> recheck_triggered;
        event_ptr recheck_event{nullptr, event_free};
        struct fuse_session *session;
        /* keep a global buffer as an optimization */
        struct fuse_buf buf{};
 };
 
-enum class steve_token_availability {
-       available,
-       no_tokens,
-       load_exceeded,
-       per_process_limit_exceeded,
-};
-
 static void steve_get_load(steve_state *state)
 {
        if (state->loadavg_fd == -2) {
@@ -181,6 +185,48 @@ static void steve_get_load(steve_state *state)
        }
 }
 
+static void steve_get_memory_use(steve_state *state)
+{
+       if (state->meminfo_fd == -2) {
+               state->meminfo_fd = open("/proc/meminfo", O_RDONLY);
+               if (state->meminfo_fd == -1)
+                       perror("Unable to open /proc/meminfo, memory use will 
not be available");
+       }
+
+       if (state->meminfo_fd != -1) {
+               char buf[4096] = {"\n"};
+               ssize_t rd = pread(state->meminfo_fd, &buf[1], sizeof(buf) - 2, 
0);
+
+               if (rd >= 0) {
+                       buf[rd + 1] = 0;
+
+                       constexpr char mem_avail_label[] = "\nMemAvailable:";
+                       char *mem_avail = strstr(buf, mem_avail_label);
+                       if (mem_avail) {
+                               mem_avail += sizeof(mem_avail_label) - 1;
+                               mem_avail += strspn(mem_avail, " ");
+                               char *end = mem_avail + strspn(mem_avail, 
"0123456789");
+                               if (!strncmp(end, " kB\n", 4)) {
+                                       *end = 0;
+                                       long val;
+                                       if (arg_to_long(mem_avail, &val)) {
+                                               state->memory_avail = val / 
1024;
+                                               return;
+                                       }
+                               }
+                       }
+
+                       std::print(stderr, "Parsing /proc/meminfo failed\n");
+               } else
+                       perror("Reading /proc/meminfo failed, memory use will 
not be available");
+
+               close(state->meminfo_fd);
+               state->meminfo_fd = -1;
+       }
+
+       state->memory_avail = -1;
+}
+
 static steve_token_availability steve_can_give_token(steve_state *state, 
uint64_t pid)
 {
        /* if there is a token reserved, we give it immediately (even if load 
is exceeded now) */
@@ -208,11 +254,24 @@ static steve_token_availability 
steve_can_give_token(steve_state *state, uint64_
                        if (evtimer_add(state->recheck_event.get(), 
&state->recheck_timeout) == -1)
                                std::print(stderr, "failed to enable recheck 
timer\n");
                        else
-                               state->recheck_triggered = true;
+                               state->recheck_triggered = 
steve_token_availability::load_exceeded;
 
                        return steve_token_availability::load_exceeded;
                }
        }
+       if (state->min_memory_avail > 0) {
+               steve_get_memory_use(state);
+               if (state->memory_avail < state->min_memory_avail) {
+                       /* trigger a recheck if we don't have one now */
+                       assert(!state->recheck_triggered);
+                       if (evtimer_add(state->recheck_event.get(), 
&state->recheck_timeout) == -1)
+                               std::print(stderr, "failed to enable recheck 
timer\n");
+                       else
+                               state->recheck_triggered = 
steve_token_availability::memory_use_exceeded;
+
+                       return steve_token_availability::memory_use_exceeded;
+               }
+       }
 
        return steve_token_availability::available;
 }
@@ -259,16 +318,21 @@ static std::string steve_token_stats(
        const steve_state *state,
        const steve_process *process,
        bool include_reserved = true,
-       bool include_load = true)
+       bool include_load_mem = true)
 {
        assert(state);
        assert(process);
 
        std::string ret = std::format("{} left, {}",
                state->tokens, steve_process_token_stats(process, 
include_reserved));
-       if (include_load && state->max_load_avg > 0)
-               ret += std::format(", load average = {:.3} (limit: {})",
-                       state->load_avg, state->max_load_avg);
+       if (include_load_mem) {
+               if (state->max_load_avg > 0)
+                       ret += std::format(", load average = {:.3} (limit: {})",
+                               state->load_avg, state->max_load_avg);
+               if (state->min_memory_avail > 0)
+                       ret += std::format(", memory available = {} MiB (min: 
{} MiB)",
+                               state->memory_avail, state->min_memory_avail);
+       }
        return ret;
 }
 
@@ -407,11 +471,13 @@ static void steve_init(void *userdata, struct 
fuse_conn_info *)
        state->tokens = state->jobs;
 
        std::print(stderr, "steve running on /dev/{} for {} jobs\n", 
state->dev_name, state->jobs);
-       if (state->max_load_avg > 0) {
+       if (state->max_load_avg > 0)
                std::print(stderr, "  tokens will be served with load average < 
{:.3}\n", state->max_load_avg);
+       if (state->min_memory_avail > 0)
+               std::print(stderr, "  tokens will be served with memory 
available >= {} MiB\n", state->min_memory_avail);
+       if (state->max_load_avg > 0 || state->min_memory_avail > 0)
                std::print(stderr, "  with a recheck timeout of {} s {} us\n",
                                state->recheck_timeout.tv_sec, 
state->recheck_timeout.tv_usec);
-       }
        if (state->min_jobs > 0)
                std::print(stderr, "  at least {} jobs will be always 
available\n", state->min_jobs);
        if (state->per_process_limit > 0)
@@ -529,6 +595,41 @@ static void steve_interrupt(fuse_req_t req, void *userdata)
        }
 }
 
+static void steve_explain_no_token(
+       steve_token_availability token_avail,
+       const steve_state *state,
+       uint64_t pid,
+       const steve_process *process)
+{
+       switch (token_avail) {
+               case steve_token_availability::load_exceeded:
+                       std::print(stderr, "Load exceeded while {} requested 
token, waiting, {} tokens free, "
+                                       "{}, load average {:.3} >= {}\n",
+                                       steve_process_id(pid, process), 
state->tokens,
+                                       steve_process_token_stats(process, 
false),
+                                       state->load_avg, state->max_load_avg);
+                       break;
+               case steve_token_availability::memory_use_exceeded:
+                       std::print(stderr, "Memory use exceeded while {} 
requested token, waiting, {} tokens free, "
+                                       "{}, memory available: {} MiB < {} 
MiB\n",
+                                       steve_process_id(pid, process), 
state->tokens,
+                                       steve_process_token_stats(process, 
false),
+                                       state->memory_avail, 
state->min_memory_avail);
+                       break;
+               case steve_token_availability::per_process_limit_exceeded:
+                       std::print(stderr, "{} exceeded per-process token 
limit, waiting, {} tokens free\n",
+                                       steve_process_id(pid, process), 
state->tokens);
+                       break;
+               case steve_token_availability::no_tokens:
+                       std::print(stderr, "No free job token for {}, waiting, 
{}\n",
+                                       steve_process_id(pid, process), 
steve_process_token_stats(process, false));
+                       break;
+               case steve_token_availability::available:
+                       assert(0 && "not reached");
+                       break;
+       }
+}
+
 static void steve_read(
        fuse_req_t req, size_t size, off_t off, struct fuse_file_info *fi)
 {
@@ -556,18 +657,8 @@ static void steve_read(
        }
 
        state->waiters.emplace_back(steve_waiter{req, fi->fh});
-       if (state->verbose) {
-               steve_process *process = &state->processes.at(fi->fh);
-               if (token_avail == steve_token_availability::load_exceeded) {
-                       std::print(stderr, "Load exceeded while {} requested 
token, waiting, {} tokens free, "
-                                       "{}, load average {:.3} >= {}\n",
-                                       steve_process_id(fi->fh, process), 
state->tokens,
-                                       steve_process_token_stats(process, 
false),
-                                       state->load_avg, state->max_load_avg);
-               } else
-                       std::print(stderr, "No free job token for {}, waiting, 
{}\n",
-                                       steve_process_id(fi->fh, process), 
steve_process_token_stats(process, false));
-       }
+       if (state->verbose)
+               steve_explain_no_token(token_avail, state, fi->fh, 
&state->processes.at(fi->fh));
        fuse_req_interrupt_func(req, steve_interrupt, state);
 }
 
@@ -679,19 +770,13 @@ static void steve_poll(
        }
 
        if (state->verbose) {
-               steve_process *process = &state->processes.at(fi->fh);
-               if (token_avail == steve_token_availability::load_exceeded) {
-                       assert(state->max_load_avg > 0);
-                       /* capped by load average */
-                       std::print(stderr, "Load exceeded while {} requested 
token, waiting, {} tokens free, "
-                                       "{}, load average {:.3} >= {}\n",
-                                       steve_process_id(fi->fh, process), 
state->tokens,
-                                       steve_process_token_stats(process, 
false),
-                                       state->load_avg, state->max_load_avg);
-               } else
+               const steve_process *process = &state->processes.at(fi->fh);
+               if (token_avail == steve_token_availability::available)
                        std::print(stderr, "{} requested poll, {} tokens 
available, {}\n",
                                        steve_process_id(fi->fh, process), 
state->tokens,
                                        steve_process_token_stats(process, 
false));
+               else
+                       steve_explain_no_token(token_avail, state, fi->fh, 
process);
        }
 
        fuse_reply_poll(req, events);
@@ -876,7 +961,7 @@ static void steve_handle_cuse(evutil_socket_t, short, void 
*userdata) {
 
 static void steve_handle_recheck(evutil_socket_t, short, void *userdata) {
        steve_state *state = static_cast<steve_state *>(userdata);
-       state->recheck_triggered = false;
+       state->recheck_triggered.reset();
        steve_wake_waiters(state);
 }
 
@@ -940,6 +1025,9 @@ static constexpr char steve_usage[] =
 "    --load-recheck-timeout=TIMEOUT, -r TIMEOUT\n"
 "                           timeout for throttling due to exceeded load, in 
sec\n"
 "                           (fractional down to usec, default: 0.5)\n"
+"    --min-memory=MIN_MEM, -M MIN_MEM\n"
+"                           do not serve tokens available memory is above\n"
+"                           MIN_MEM, in MiB\n"
 "    --min-jobs=MIN_JOBS, -m MIN_JOBS\n"
 "                           min. jobs to serve even if load average is 
exceeded\n"
 "                           (default: 1)\n"
@@ -962,6 +1050,7 @@ static const struct option steve_long_opts[] = {
        { "jobs", required_argument, 0, 'j' },
        { "load-average", required_argument, 0, 'l' },
        { "load-recheck-timeout", required_argument, 0, 'r' },
+       { "min-memory", required_argument, 0, 'M' },
        { "min-jobs", required_argument, 0, 'm' },
        { "per-process-limit", required_argument, 0, 'p' },
        { "dev-name", required_argument, 0, steve_long_option::dev_name },
@@ -971,7 +1060,7 @@ static const struct option steve_long_opts[] = {
        {},
 };
 
-static const char *steve_short_opts = "hVj:l:r:m:p:u:vd";
+static const char *steve_short_opts = "hVj:l:r:m:M:p:u:vd";
 
 int main(int argc, char **argv)
 {
@@ -990,17 +1079,20 @@ int main(int argc, char **argv)
                                return 0;
                        case 'j':
                        case 'm':
+                       case 'M':
                        case 'p':
                                {
                                        long jobs_arg;
                                        if (!arg_to_long(optarg, &jobs_arg)) {
-                                               std::print(stderr, "invalid job 
number: {}\n", optarg);
+                                               std::print(stderr, "invalid 
argument: {}\n", optarg);
                                                return 1;
                                        }
                                        if (opt == 'j')
                                                state.jobs = jobs_arg;
                                        else if (opt == 'm')
                                                state.min_jobs = jobs_arg;
+                                       else if (opt == 'M')
+                                               state.min_memory_avail = 
jobs_arg;
                                        else if (opt == 'p')
                                                state.per_process_limit = 
jobs_arg;
                                        else

Reply via email to