commit: aae0fbc30a7bb56f266edb911bca34a0ea2cff21
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Sun Nov 9 18:56:38 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Sun Nov 9 18:56:38 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=aae0fbc3
Reclaim all job tokens on exit
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
pyproject.toml | 1 -
steve.py | 65 +++++++++++++++++++++++++++++++++++++---------------------
2 files changed, 42 insertions(+), 24 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index f5efec8..7f4e74a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -58,7 +58,6 @@ extend-select = [
"ARG",
"ERA",
"PGH",
- "PERF",
"FURB",
"RUF",
]
diff --git a/steve.py b/steve.py
index f15dcc4..5606009 100755
--- a/steve.py
+++ b/steve.py
@@ -13,6 +13,7 @@ import contextlib
import fcntl
import multiprocessing
import os
+import select
import signal
import termios
from pathlib import Path
@@ -46,14 +47,45 @@ def print_state(job_file: io.FileIO) -> None:
@contextlib.contextmanager
-def hold_fifo(path: Path) -> Generator[None]:
+def hold_fifo(path: Path) -> Generator[Path]:
os.mkfifo(path)
try:
- yield
+ yield path
finally:
path.unlink()
[email protected]
+def close_fd(fd: int) -> Generator[int]:
+ try:
+ yield fd
+ finally:
+ os.close(fd)
+
+
+def remove_job_tokens(pipe_fd: int, num: int) -> None:
+ """Remove num job tokens from pipe pipe_fd, open non-blocking"""
+
+ while num > 0:
+ try:
+ removed = len(os.read(pipe_fd, num))
+ except BlockingIOError:
+ # block until more data is available
+ select.select([pipe_fd], [], [])
+ else:
+ num -= removed
+ print(f"{removed} jobs removed, {num} left")
+
+
[email protected]
+def job_cleanup(pipe_fd: int, expected_jobs: int) -> Generator[None]:
+ try:
+ yield
+ finally:
+ print(f"Waiting for {expected_jobs} tokens to be returned")
+ remove_job_tokens(pipe_fd, expected_jobs)
+
+
def main() -> None:
signal.signal(signal.SIGINT, exit_sig_handler)
signal.signal(signal.SIGTERM, exit_sig_handler)
@@ -104,26 +136,11 @@ def main() -> None:
return
if args.delete_jobs:
- # first, remove as many jobs as we can without blocking
- job_fd = os.open(args.path, os.O_RDONLY | os.O_NONBLOCK)
- try:
- removed = len(os.read(job_fd, args.delete_jobs))
- except BlockingIOError:
- pass
- else:
- print(f"{removed} jobs removed from {args.path}")
- args.delete_jobs -= removed
- os.close(job_fd)
-
- if args.delete_jobs == 0:
- return
-
- print("Waiting for more jobs (may block forever)")
- with open(args.path, "rb") as job_file:
- while args.delete_jobs > 0:
- job_file.read(1)
- args.delete_jobs -= 1
- print(f"Job removed, {args.delete_jobs} left to remove")
+ with close_fd(
+ os.open(args.path, os.O_RDONLY | os.O_NONBLOCK),
+ ) as job_fd:
+ print(f"Removing {args.delete_jobs} job tokens from {args.path}")
+ remove_job_tokens(job_fd, args.delete_jobs)
return
if args.jobs is None:
@@ -134,7 +151,8 @@ def main() -> None:
with contextlib.ExitStack() as context_managers:
context_managers.enter_context(hold_fifo(args.path))
- _ = os.open(args.path, os.O_RDONLY | os.O_NONBLOCK)
+ job_read_fd = os.open(args.path, os.O_RDONLY | os.O_NONBLOCK)
+ context_managers.enter_context(close_fd(job_read_fd))
job_file = args.path.open("wb")
signal.signal(
signal.SIGUSR1,
@@ -147,6 +165,7 @@ def main() -> None:
job_file.write(b"." * args.jobs)
job_file.flush()
+ context_managers.enter_context(job_cleanup(job_read_fd, args.jobs))
print(f"Started for {args.jobs} jobs")
print(f'MAKEFLAGS="--jobserver-auth=fifo:{args.path}"')