commit: 6fbb8e659ca195c5769485c518c4d5542d1fb0e1 Author: Michał Górny <mgorny <AT> gentoo <DOT> org> AuthorDate: Tue Dec 2 13:43:14 2025 +0000 Commit: Michał Górny <mgorny <AT> gentoo <DOT> org> CommitDate: Fri Dec 26 13:42:19 2025 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=6fbb8e65
System jobserver support Add FEATURES=jobserver-token that causes Portage to acquire a job token from a jobserver (if one is running) for every started task, and release it once it finishes. Note that this does not implement the implicit slot feature, since it is meant to work with the system-wide jobserver, under the assumption that emerge will be run directly by the user and therefore no job token will be taken for the emerge invocation itself. Bug: https://bugs.gentoo.org/966879 Part-of: https://github.com/gentoo/portage/pull/1528 Signed-off-by: Michał Górny <mgorny <AT> gentoo.org> lib/_emerge/Scheduler.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++- lib/portage/const.py | 1 + man/make.conf.5 | 4 ++ 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py index 30cdaff6bb..fa45b50c0f 100644 --- a/lib/_emerge/Scheduler.py +++ b/lib/_emerge/Scheduler.py @@ -245,6 +245,10 @@ class Scheduler(PollScheduler): for root in self.trees: self._config_pool[root] = [] + features = self.settings.features + self._jobserver_fd = None + self._jobserver_tokens = {} + self._fetch_log = os.path.join( _emerge.emergelog._emerge_log_dir, "emerge-fetch.log" ) @@ -263,6 +267,7 @@ class Scheduler(PollScheduler): self._pkg_queue = [] self._jobs = 0 self._running_tasks = {} + self._job_tokens = {} self._completed_tasks = set() self._main_exit = None self._main_loadavg_handle = None @@ -302,7 +307,6 @@ class Scheduler(PollScheduler): # jobs completes. self._choose_pkg_return_early = False - features = self.settings.features if "parallel-fetch" in features and not ( "--pretend" in self.myopts or "--fetch-all-uri" in self.myopts @@ -384,6 +388,7 @@ class Scheduler(PollScheduler): # will never be started, so purged it from # self._running_tasks so that it won't keep the main # loop running. + self._release_job_token(id(task)) del self._running_tasks[id(task)] for q in self._task_queues.values(): @@ -1540,6 +1545,7 @@ class Scheduler(PollScheduler): def _build_exit(self, build): self._running_tasks.pop(id(build), None) + self._release_job_token(id(build)) if build.returncode == os.EX_OK and self._terminated_tasks: # We've been interrupted, so we won't # add this to the merge queue. @@ -1552,6 +1558,7 @@ class Scheduler(PollScheduler): merge=build, scheduler=self._sched_iface, ) + # move the job token to the merge task self._running_tasks[id(merge)] = merge # By default, merge-wait only allows merge when no builds are executing. # As a special exception, dependencies on system packages are frequently @@ -1601,6 +1608,32 @@ class Scheduler(PollScheduler): def _main_loop(self): self._main_exit = self._event_loop.create_future() + if "jobserver-token" in self.settings.features: + makeflags = self.settings.get("MAKEFLAGS", "").split() + jobserver_path = None + for flag in makeflags: + if flag.startswith("--jobserver-auth="): + value = flag.removeprefix("--jobserver-auth=") + if value.startswith("fifo:"): + jobserver_path = value.removeprefix("fifo:") + else: + jobserver_path = None + print() + out = portage.output.EOutput() + out.ewarn(f"Unsupported jobserver type: {flag}") + if jobserver_path is not None: + try: + self._jobserver_fd = os.open( + jobserver_path, os.O_RDWR | os.O_NONBLOCK + ) + except OSError as exception: + out = portage.output.EOutput() + print() + out.eerror("") + out.eerror(f"Unable to connect to jobserver: {exception}") + out.eerror("") + self.terminate() + if ( self._max_load is not None and self._loadavg_latency is not None @@ -1692,6 +1725,9 @@ class Scheduler(PollScheduler): if self._schedule_merge_wakeup_task is not None: self._schedule_merge_wakeup_task.cancel() self._schedule_merge_wakeup_task = None + if self._jobserver_fd is not None: + os.close(self._jobserver_fd) + self._jobserver_fd = None def _choose_pkg(self): """ @@ -2052,6 +2088,55 @@ class Scheduler(PollScheduler): return False + def _acquire_job_token(self) -> str: + """ + Acquire a job token. Returns the token if available, or an empty value + if no jobserver is used (or the jobserver died). Raises BlockingIOError + if no tokens are available. + """ + if self._jobserver_fd is None: + return b"" + try: + return os.read(self._jobserver_fd, 1) + except BlockingIOError: + raise + except OSError as exception: + # If something failed (say, jobserver died), disable its usage + # and terminate the build (as sub-builds will likely fail too). + os.close(self._jobserver_fd) + self._jobserver_fd = None + out = portage.output.EOutput() + print() + out.eerror("") + out.eerror(f"Jobserver I/O failed, terminating: {exception}") + out.eerror("") + self.terminate() + return b"" + + def _release_job_token(self, task_id: int) -> None: + """ + Release a job token for given task. If no jobserver is running, does + nothing. + """ + token = self._jobserver_tokens.pop(task_id, None) + if token is not None and self._jobserver_fd is not None: + try: + os.write(self._jobserver_fd, token) + except OSError as exception: + # If something failed (say, jobserver died), disable its usage + # and terminate the build (as sub-builds will likely fail too). + os.close(self._jobserver_fd) + self._jobserver_fd = None + out = portage.output.EOutput() + out.eerror("") + out.eerror(f"Jobserver I/O failed, terminating: {exception}") + out.eerror("") + self.terminate() + + def _unblock_jobs(self) -> None: + self._sched_iface.remove_reader(self._jobserver_fd) + self._schedule() + def _schedule_tasks_imp(self): """ @rtype: bool @@ -2077,6 +2162,15 @@ class Scheduler(PollScheduler): if pkg is None: return state_change + if not pkg.installed: + try: + token = self._acquire_job_token() + except BlockingIOError: + # no job tokens available right now + self._pkg_queue.insert(0, pkg) + self._sched_iface.add_reader(self._jobserver_fd, self._unblock_jobs) + return state_change + state_change = True if not pkg.installed: @@ -2094,6 +2188,7 @@ class Scheduler(PollScheduler): self._jobs += 1 self._previous_job_start_time = time.time() self._status_display.running = self._jobs + self._jobserver_tokens[id(task)] = token self._running_tasks[id(task)] = task task.scheduler = self._sched_iface self._task_queues.jobs.add(task) diff --git a/lib/portage/const.py b/lib/portage/const.py index ecb897bb0a..900c30767c 100644 --- a/lib/portage/const.py +++ b/lib/portage/const.py @@ -193,6 +193,7 @@ SUPPORTED_FEATURES = frozenset( "icecream", "installsources", "ipc-sandbox", + "jobserver-token", "keeptemp", "keepwork", "lmirror", diff --git a/man/make.conf.5 b/man/make.conf.5 index b7505332a1..3d610b1406 100644 --- a/man/make.conf.5 +++ b/man/make.conf.5 @@ -568,6 +568,10 @@ and \fBbinpkg-dostrip\fR is enabled. Isolate the ebuild phase functions from host IPC namespace. Supported only on Linux. Requires IPC namespace support in kernel. .TP +.B jobserver\-token +Acquire a job token for every build from the jobserver if one is found running. +This feature is experimental and its semantics may change. +.TP .B keeptemp Do not delete the ${T} directory after the merge process. .TP
