Hi Ángel, On Fri, Jan 17, 2025 at 01:14:04AM +0100, Ángel wrote: > The script looks good, and easy to read. It wouldn't be hard to > translate it to another language if needed to drop the python > dependency (but that would increase the nloc)
thank you for the detailed review. I also picked up Julien's request for detecting cores rater than threads and attach an updated version. I suppose moving to some git would be good. https://salsa.debian.org/helmutg/guess_concurrency To get things going, I created it in my personal namespace, but I'm happy to move it to debian or elsewhere. > I find this behavior a bit surprising: > > $ python3 guess_concurrency.py --min 10 --max 2 > 10 > > If there is a minimum limit, it is returned, even if that violates the > max. It makes some sense to pick something but I as actually expecting > an error to the above. How could one disagree with that? Fixed. > The order of processing the cpus is a bit awkward as well. > > The order it uses is CMAKE_BUILD_PARALLEL_LEVEL, DEB_BUILD_OPTIONS, > RPM_BUILD_NCPUS, --detect <n>, nproc/os.cpu_count() > > But the order in the code is 4, 5, 3, 2, 1 > Not straightforward. Again, how would I disagree? I have changed the order of invocation to match the order of preference and in that process also changed --detect=N to take precendence over all environment variables while still preferring environment variables over other detectors. Hope that makes sense. > Also, it is doing actions such as running external program nproc even > it if's going to be discarded later. (nproc is in an essential package, > I know, but still) This is fixed as a result of your earlier remark. > Also, why would the user want to manually specify between nptoc and > os.cpu_count()? While nproc is universally available on Debian, I anticipate that the tool could be useful on non-Linux platforms and there os.cpu_count() may be more portable. I mainly added it due to the low effort and to demonstrate the ability to use different detection methods. The values I see users pass to --detect are actual numbers or "cores". > I would unconditionally call nproc, with a fallback to os.cpu_count() > if that fails (I'm assuming nproc may be smarter than os.cpu_count(), > otherwise one could use cpu_count() always) Indeed nproc kinda is smarter as it honours some environment variables such as OMP_NUM_THREADS and OMP_THREAD_LIMIT. Not sure we need that fallback. > I suggest doing: > > def main() -> None: > parser = argparse.ArgumentParser() > parser.add_argument( > "--count", > action="store", > default=None, > metavar="NUMBER", > help="supply a processor count", > ) Is there a reason to rename it from --detect to --count? The advantage of reusing --detect I see is that you cannot reasonably combine those two options and --detect=9 intuitively makes sufficient sense to me. > args = parser.parse_args() > guess = None > try: > if args.count: > guess = positive_integer(args.count) > except ValueError: > parser.error("invalid argument to --count") > guess = guess or guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL") > guess = guess or guess_deb_build_parallel() > guess = guess or guess_from_environment("RPM_BUILD_NCPUS") > if not guess: > try: > guess = guess_nproc() > finally: > guess = guess or guess_python() I see three main proposal in this code: * Order of preference reflects order of code. (implemented) * A given count overrides all other detection mechanisms. (implemented) * Fallback from nproc to os.cpucount(). (not convinced) Did I miss anything? I'm more inclined to drop --detect=python entirely as you say its use is fairly limited. > Additionally, the --ignore argument of nproc(1) might be of use for > this script as well. Can you give a practical use case for this? Helmut
#!/usr/bin/python3 # Copyright 2024 Helmut Grohne <hel...@subdivi.de> # SPDX-License-Identifier: GPL-3 import argparse import itertools import os import pathlib import re import subprocess import sys import typing def positive_integer(decstr: str) -> int: """Parse a positive, integral number from a string and raise a ValueError otherwise. >>> positive_integer(-1) Traceback (most recent call last): ... ValueError: integer must be positive >>> positive_integer(0) Traceback (most recent call last): ... ValueError: integer must be positive >>> positive_integer(1) 1 """ value = int(decstr) # raises ValueError if value < 1: raise ValueError("integer must be positive") return value def parse_size(expression: str) -> int: """Parse an expression representing a data size with an optional unit suffix into an integer. Raises a ValueError on failure. >>> parse_size("5") 5 >>> parse_size("4KB") 4096 >>> parse_size("0.9g") 966367641 >>> parse_size("-1") Traceback (most recent call last): ... ValueError: number must be positive """ expression = expression.lower() if expression.endswith("b"): expression = expression[:-1] suffixes = { "k": 2**10, "m": 2**20, "g": 2**30, "t": 2**40, "e": 2**50, } factor = 1 if expression[-1:] in suffixes: factor = suffixes[expression[-1]] expression = expression[:-1] fval = float(expression) # propagate ValueError value = int(fval * factor) # propagate ValueError if value < 1: raise ValueError("number must be positive") return value def guess_python() -> int | None: """Estimate the number of processors using Python's os.cpu_count(). >>> guess_python() > 0 True """ return os.cpu_count() def guess_nproc() -> int: """Estimate number of processors using coreutils' nproc. >>> guess_nproc() > 0 True """ return positive_integer( subprocess.check_output(["nproc"], encoding="ascii") ) def guess_cores() -> int | None: """Estimate the number of cores (not SMT threads) using /proc/cpuinfo. This is done by counting the number of distinct "core id" values. """ cpus = set() try: with open("/proc/cpuinfo", encoding="utf8") as cf: for line in cf: if m := re.match(r"^core id\s+:\s+(\d+)$", line, re.ASCII): cpus.add(int(m.group(1))) except FileNotFoundError: return None if not cpus: return None return len(cpus) def guess_deb_build_parallel( environ: typing.Mapping[str, str] = os.environ ) -> int | None: """Parse a possible parallel= assignment in a DEB_BUILD_OPTIONS environment variable. >>> guess_deb_build_parallel({}) >>> guess_deb_build_parallel({"DEB_BUILD_OPTIONS": "nocheck parallel=3"}) 3 """ try: options = environ["DEB_BUILD_OPTIONS"] except KeyError: return None for option in options.split(): if option.startswith("parallel="): option = option.removeprefix("parallel=") try: return positive_integer(option) except ValueError: pass return None def guess_from_environment( variable: str, environ: typing.Mapping[str, str] = os.environ ) -> int | None: """Read a number from an environment variable. >>> guess_from_environment("CPUS", {"CPUS": 4}) 4 >>> guess_from_environment("CPUS", {"other": 3}) """ try: return positive_integer(environ[variable]) except (KeyError, ValueError): return None def guess_memavailable() -> int: """Estimate the available memory from /proc/meminfo in bytes.""" with open("/proc/meminfo", encoding="ascii") as fh: for line in fh: if line.startswith("MemAvailable:"): line = line.removeprefix("MemAvailable:").strip() return 1024 * positive_integer(line.removesuffix("kB")) raise RuntimeError("no MemAvailable line found in /proc/meminfo") def guess_cgroup_memory() -> int | None: """Return the smalles "memory.high" or "memory.max" limit of the current cgroup or any parent of it if any. """ guess: int | None = None mygroup = pathlib.PurePath( pathlib.Path("/proc/self/cgroup") .read_text(encoding=sys.getfilesystemencoding()) .strip() .split(":", 2)[2] ).relative_to("/") sfc = pathlib.Path("/sys/fs/cgroup") for group in itertools.chain((mygroup,), mygroup.parents): for entry in ("memory.max", "memory.high"): try: value = positive_integer( (sfc / group / entry).read_text(encoding="ascii") ) except (FileNotFoundError, ValueError): pass else: if guess is None: guess = value else: guess = min(guess, value) return guess def parse_required_memory(expression: str) -> list[int]: """Parse comma-separated list of memory expressions. Empty expressions copy the previous entry. >>> parse_required_memory("1k,9,,1") [1024, 9, 9, 1] """ values: list[int] = [] for memexpr in expression.split(","): if not memexpr: if values: values.append(values[-1]) else: raise ValueError("initial memory expression cannot be empty") else: values.append(parse_size(memexpr)) return values def guess_memory_concurrency(memory: int, usage: list[int]) -> int: """Estimate the maximum number of cores that can be used given the available memory and a sequence of per-core memory consumption. >>> guess_memory_concurrency(4, [1]) 4 >>> guess_memory_concurrency(10, [5, 4, 3]) 2 >>> guess_memory_concurrency(2, [3]) 1 """ concurrency = 0 for use in usage[:-1]: if use > memory: break memory -= use concurrency += 1 else: concurrency += memory // usage[-1] return max(1, concurrency) def clamp( value: int, lower: int | None = None, upper: int | None = None ) -> int: """Return an adjusted value that does not exceed the lower or upper limits if any. >>> clamp(5, upper=4) 4 >>> clamp(9, 2) 9 """ if upper is not None and upper < value: value = upper if lower is not None and lower > value: value = lower return value def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( "--detect", action="store", default="nproc", metavar="METHOD", help="supply a processor count or select a detection method" "(nproc, python or cores)", ) parser.add_argument( "--max", action="store", type=positive_integer, default=None, metavar="N", help="limit the number of detected cores to a given maximum", ) parser.add_argument( "--min", action="store", type=positive_integer, default=None, metavar="N", help="limit the number of detected cores to a given minimum", ) parser.add_argument( "--require-mem", action="store", type=parse_required_memory, default=[], metavar="MEMLIST", help="specify per-core required memory as a comma separated list", ) args = parser.parse_args() if args.min is not None and args.max is not None and args.min > args.max: parser.error("--min value larger than --max value") guess = None detectfunc = None for detector in args.detect.split(","): try: guess = positive_integer(detector) except ValueError: try: detectfunc = { "nproc": guess_nproc, "python": guess_python, "cores": guess_cores, }[detector] except KeyError: parser.error("invalid argument to --detect") if guess is None: assert detectfunc is not None guess = ( guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL") or guess_deb_build_parallel() or guess_from_environment("RPM_BUILD_NCPUS") or detectfunc() ) if guess is None: print("failed to guess processor count", file=sys.stderr) sys.exit(1) if args.require_mem and guess > 1: memory = clamp(guess_memavailable(), upper=guess_cgroup_memory()) guess = clamp( guess, upper=guess_memory_concurrency(memory, args.require_mem) ) guess = clamp(guess, args.min, args.max) print(guess) if __name__ == "__main__": main()