commit:     6fbb8e659ca195c5769485c518c4d5542d1fb0e1
Author:     Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Tue Dec  2 13:43:14 2025 +0000
Commit:     Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Fri Dec 26 13:42:19 2025 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=6fbb8e65

System jobserver support

Add FEATURES=jobserver-token that causes Portage to acquire a job token
from a jobserver (if one is running) for every started task, and release
it once it finishes.  Note that this does not implement the implicit
slot feature, since it is meant to work with the system-wide jobserver,
under the assumption that emerge will be run directly by the user
and therefore no job token will be taken for the emerge invocation
itself.

Bug: https://bugs.gentoo.org/966879
Part-of: https://github.com/gentoo/portage/pull/1528
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>

 lib/_emerge/Scheduler.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++-
 lib/portage/const.py     |  1 +
 man/make.conf.5          |  4 ++
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index 30cdaff6bb..fa45b50c0f 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -245,6 +245,10 @@ class Scheduler(PollScheduler):
         for root in self.trees:
             self._config_pool[root] = []
 
+        features = self.settings.features
+        self._jobserver_fd = None
+        self._jobserver_tokens = {}
+
         self._fetch_log = os.path.join(
             _emerge.emergelog._emerge_log_dir, "emerge-fetch.log"
         )
@@ -263,6 +267,7 @@ class Scheduler(PollScheduler):
         self._pkg_queue = []
         self._jobs = 0
         self._running_tasks = {}
+        self._job_tokens = {}
         self._completed_tasks = set()
         self._main_exit = None
         self._main_loadavg_handle = None
@@ -302,7 +307,6 @@ class Scheduler(PollScheduler):
         # jobs completes.
         self._choose_pkg_return_early = False
 
-        features = self.settings.features
         if "parallel-fetch" in features and not (
             "--pretend" in self.myopts
             or "--fetch-all-uri" in self.myopts
@@ -384,6 +388,7 @@ class Scheduler(PollScheduler):
                 # will never be started, so purged it from
                 # self._running_tasks so that it won't keep the main
                 # loop running.
+                self._release_job_token(id(task))
                 del self._running_tasks[id(task)]
 
         for q in self._task_queues.values():
@@ -1540,6 +1545,7 @@ class Scheduler(PollScheduler):
 
     def _build_exit(self, build):
         self._running_tasks.pop(id(build), None)
+        self._release_job_token(id(build))
         if build.returncode == os.EX_OK and self._terminated_tasks:
             # We've been interrupted, so we won't
             # add this to the merge queue.
@@ -1552,6 +1558,7 @@ class Scheduler(PollScheduler):
                 merge=build,
                 scheduler=self._sched_iface,
             )
+            # move the job token to the merge task
             self._running_tasks[id(merge)] = merge
             # By default, merge-wait only allows merge when no builds are 
executing.
             # As a special exception, dependencies on system packages are 
frequently
@@ -1601,6 +1608,32 @@ class Scheduler(PollScheduler):
     def _main_loop(self):
         self._main_exit = self._event_loop.create_future()
 
+        if "jobserver-token" in self.settings.features:
+            makeflags = self.settings.get("MAKEFLAGS", "").split()
+            jobserver_path = None
+            for flag in makeflags:
+                if flag.startswith("--jobserver-auth="):
+                    value = flag.removeprefix("--jobserver-auth=")
+                    if value.startswith("fifo:"):
+                        jobserver_path = value.removeprefix("fifo:")
+                    else:
+                        jobserver_path = None
+                        print()
+                        out = portage.output.EOutput()
+                        out.ewarn(f"Unsupported jobserver type: {flag}")
+            if jobserver_path is not None:
+                try:
+                    self._jobserver_fd = os.open(
+                        jobserver_path, os.O_RDWR | os.O_NONBLOCK
+                    )
+                except OSError as exception:
+                    out = portage.output.EOutput()
+                    print()
+                    out.eerror("")
+                    out.eerror(f"Unable to connect to jobserver: {exception}")
+                    out.eerror("")
+                    self.terminate()
+
         if (
             self._max_load is not None
             and self._loadavg_latency is not None
@@ -1692,6 +1725,9 @@ class Scheduler(PollScheduler):
         if self._schedule_merge_wakeup_task is not None:
             self._schedule_merge_wakeup_task.cancel()
             self._schedule_merge_wakeup_task = None
+        if self._jobserver_fd is not None:
+            os.close(self._jobserver_fd)
+            self._jobserver_fd = None
 
     def _choose_pkg(self):
         """
@@ -2052,6 +2088,55 @@ class Scheduler(PollScheduler):
 
         return False
 
+    def _acquire_job_token(self) -> str:
+        """
+        Acquire a job token. Returns the token if available, or an empty value
+        if no jobserver is used (or the jobserver died). Raises BlockingIOError
+        if no tokens are available.
+        """
+        if self._jobserver_fd is None:
+            return b""
+        try:
+            return os.read(self._jobserver_fd, 1)
+        except BlockingIOError:
+            raise
+        except OSError as exception:
+            # If something failed (say, jobserver died), disable its usage
+            # and terminate the build (as sub-builds will likely fail too).
+            os.close(self._jobserver_fd)
+            self._jobserver_fd = None
+            out = portage.output.EOutput()
+            print()
+            out.eerror("")
+            out.eerror(f"Jobserver I/O failed, terminating: {exception}")
+            out.eerror("")
+            self.terminate()
+            return b""
+
+    def _release_job_token(self, task_id: int) -> None:
+        """
+        Release a job token for given task. If no jobserver is running, does
+        nothing.
+        """
+        token = self._jobserver_tokens.pop(task_id, None)
+        if token is not None and self._jobserver_fd is not None:
+            try:
+                os.write(self._jobserver_fd, token)
+            except OSError as exception:
+                # If something failed (say, jobserver died), disable its usage
+                # and terminate the build (as sub-builds will likely fail too).
+                os.close(self._jobserver_fd)
+                self._jobserver_fd = None
+                out = portage.output.EOutput()
+                out.eerror("")
+                out.eerror(f"Jobserver I/O failed, terminating: {exception}")
+                out.eerror("")
+                self.terminate()
+
+    def _unblock_jobs(self) -> None:
+        self._sched_iface.remove_reader(self._jobserver_fd)
+        self._schedule()
+
     def _schedule_tasks_imp(self):
         """
         @rtype: bool
@@ -2077,6 +2162,15 @@ class Scheduler(PollScheduler):
             if pkg is None:
                 return state_change
 
+            if not pkg.installed:
+                try:
+                    token = self._acquire_job_token()
+                except BlockingIOError:
+                    # no job tokens available right now
+                    self._pkg_queue.insert(0, pkg)
+                    self._sched_iface.add_reader(self._jobserver_fd, 
self._unblock_jobs)
+                    return state_change
+
             state_change = True
 
             if not pkg.installed:
@@ -2094,6 +2188,7 @@ class Scheduler(PollScheduler):
                 self._jobs += 1
                 self._previous_job_start_time = time.time()
                 self._status_display.running = self._jobs
+                self._jobserver_tokens[id(task)] = token
                 self._running_tasks[id(task)] = task
                 task.scheduler = self._sched_iface
                 self._task_queues.jobs.add(task)

diff --git a/lib/portage/const.py b/lib/portage/const.py
index ecb897bb0a..900c30767c 100644
--- a/lib/portage/const.py
+++ b/lib/portage/const.py
@@ -193,6 +193,7 @@ SUPPORTED_FEATURES = frozenset(
         "icecream",
         "installsources",
         "ipc-sandbox",
+        "jobserver-token",
         "keeptemp",
         "keepwork",
         "lmirror",

diff --git a/man/make.conf.5 b/man/make.conf.5
index b7505332a1..3d610b1406 100644
--- a/man/make.conf.5
+++ b/man/make.conf.5
@@ -568,6 +568,10 @@ and \fBbinpkg-dostrip\fR is enabled.
 Isolate the ebuild phase functions from host IPC namespace. Supported
 only on Linux. Requires IPC namespace support in kernel.
 .TP
+.B jobserver\-token
+Acquire a job token for every build from the jobserver if one is found running.
+This feature is experimental and its semantics may change.
+.TP
 .B keeptemp
 Do not delete the ${T} directory after the merge process.
 .TP

Reply via email to