Hi Ángel,

On Fri, Jan 17, 2025 at 01:14:04AM +0100, Ángel wrote:
> The script looks good, and easy to read. It wouldn't be hard to
> translate it to another language if needed to drop the python
> dependency (but that would increase the nloc)

thank you for the detailed review. I also picked up Julien's request for
detecting cores rater than threads and attach an updated version. I
suppose moving to some git would be good.

https://salsa.debian.org/helmutg/guess_concurrency
To get things going, I created it in my personal namespace, but I'm
happy to move it to debian or elsewhere.

> I find this behavior a bit surprising:
> 
> $ python3 guess_concurrency.py --min 10 --max 2
> 10
> 
> If there is a minimum limit, it is returned, even if that violates the
> max. It makes some sense to pick something but I as actually expecting
> an error to the above.

How could one disagree with that? Fixed.

> The order of processing the cpus is a bit awkward as well.
> 
> The order it uses is CMAKE_BUILD_PARALLEL_LEVEL, DEB_BUILD_OPTIONS,
> RPM_BUILD_NCPUS, --detect <n>, nproc/os.cpu_count()
> 
> But the order in the code is 4, 5, 3, 2, 1
> Not straightforward.

Again, how would I disagree? I have changed the order of invocation to
match the order of preference and in that process also changed
--detect=N to take precendence over all environment variables while
still preferring environment variables over other detectors. Hope that
makes sense.

> Also, it is doing actions such as running external program nproc even
> it if's going to be discarded later. (nproc is in an essential package,
> I know, but still)

This is fixed as a result of your earlier remark.

> Also, why would the user want to manually specify between nptoc and
> os.cpu_count()?

While nproc is universally available on Debian, I anticipate that the
tool could be useful on non-Linux platforms and there os.cpu_count() may
be more portable. I mainly added it due to the low effort and to
demonstrate the ability to use different detection methods. The values I
see users pass to --detect are actual numbers or "cores".

> I would unconditionally call nproc, with a fallback to os.cpu_count()
> if that fails (I'm assuming nproc may be smarter than os.cpu_count(),
> otherwise one could use cpu_count() always)

Indeed nproc kinda is smarter as it honours some environment variables
such as OMP_NUM_THREADS and OMP_THREAD_LIMIT. Not sure we need that
fallback.

> I suggest doing:
> 
> def main() -> None:
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         "--count",
>         action="store",
>         default=None,
>         metavar="NUMBER",
>         help="supply a processor count",
>     )

Is there a reason to rename it from --detect to --count? The advantage
of reusing --detect I see is that you cannot reasonably combine those
two options and --detect=9 intuitively makes sufficient sense to me.

>     args = parser.parse_args()
>     guess = None
>     try:
>         if args.count:
>             guess = positive_integer(args.count)
>     except ValueError:
>         parser.error("invalid argument to --count")
>     guess = guess or guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL")
>     guess = guess or guess_deb_build_parallel()
>     guess = guess or guess_from_environment("RPM_BUILD_NCPUS")
>     if not guess:
>         try:
>             guess = guess_nproc()
>         finally:
>             guess = guess or guess_python()

I see three main proposal in this code:
 * Order of preference reflects order of code. (implemented)
 * A given count overrides all other detection mechanisms. (implemented)
 * Fallback from nproc to os.cpucount(). (not convinced)
Did I miss anything?

I'm more inclined to drop --detect=python entirely as you say its use is
fairly limited.

> Additionally, the --ignore argument of nproc(1) might be of use for
> this script as well.

Can you give a practical use case for this?

Helmut
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <hel...@subdivi.de>
# SPDX-License-Identifier: GPL-3

import argparse
import itertools
import os
import pathlib
import re
import subprocess
import sys
import typing


def positive_integer(decstr: str) -> int:
    """Parse a positive, integral number from a string and raise a ValueError
    otherwise.

    >>> positive_integer(-1)
    Traceback (most recent call last):
        ...
    ValueError: integer must be positive
    >>> positive_integer(0)
    Traceback (most recent call last):
        ...
    ValueError: integer must be positive
    >>> positive_integer(1)
    1
    """
    value = int(decstr)  # raises ValueError
    if value < 1:
        raise ValueError("integer must be positive")
    return value


def parse_size(expression: str) -> int:
    """Parse an expression representing a data size with an optional unit
    suffix into an integer. Raises a ValueError on failure.

    >>> parse_size("5")
    5
    >>> parse_size("4KB")
    4096
    >>> parse_size("0.9g")
    966367641
    >>> parse_size("-1")
    Traceback (most recent call last):
        ...
    ValueError: number must be positive
    """
    expression = expression.lower()
    if expression.endswith("b"):
        expression = expression[:-1]
    suffixes = {
        "k": 2**10,
        "m": 2**20,
        "g": 2**30,
        "t": 2**40,
        "e": 2**50,
    }
    factor = 1
    if expression[-1:] in suffixes:
        factor = suffixes[expression[-1]]
        expression = expression[:-1]
    fval = float(expression)  # propagate ValueError
    value = int(fval * factor)  # propagate ValueError
    if value < 1:
        raise ValueError("number must be positive")
    return value


def guess_python() -> int | None:
    """Estimate the number of processors using Python's os.cpu_count().

    >>> guess_python() > 0
    True
    """
    return os.cpu_count()


def guess_nproc() -> int:
    """Estimate number of processors using coreutils' nproc.

    >>> guess_nproc() > 0
    True
    """
    return positive_integer(
        subprocess.check_output(["nproc"], encoding="ascii")
    )


def guess_cores() -> int | None:
    """Estimate the number of cores (not SMT threads) using /proc/cpuinfo.
    This is done by counting the number of distinct "core id" values.
    """
    cpus = set()
    try:
        with open("/proc/cpuinfo", encoding="utf8") as cf:
            for line in cf:
                if m := re.match(r"^core id\s+:\s+(\d+)$", line, re.ASCII):
                    cpus.add(int(m.group(1)))
    except FileNotFoundError:
        return None
    if not cpus:
        return None
    return len(cpus)


def guess_deb_build_parallel(
    environ: typing.Mapping[str, str] = os.environ
) -> int | None:
    """Parse a possible parallel= assignment in a DEB_BUILD_OPTIONS environment
    variable.

    >>> guess_deb_build_parallel({})
    >>> guess_deb_build_parallel({"DEB_BUILD_OPTIONS": "nocheck parallel=3"})
    3
    """
    try:
        options = environ["DEB_BUILD_OPTIONS"]
    except KeyError:
        return None
    for option in options.split():
        if option.startswith("parallel="):
            option = option.removeprefix("parallel=")
            try:
                return positive_integer(option)
            except ValueError:
                pass
    return None


def guess_from_environment(
    variable: str, environ: typing.Mapping[str, str] = os.environ
) -> int | None:
    """Read a number from an environment variable.

    >>> guess_from_environment("CPUS", {"CPUS": 4})
    4
    >>> guess_from_environment("CPUS", {"other": 3})
    """
    try:
        return positive_integer(environ[variable])
    except (KeyError, ValueError):
        return None


def guess_memavailable() -> int:
    """Estimate the available memory from /proc/meminfo in bytes."""
    with open("/proc/meminfo", encoding="ascii") as fh:
        for line in fh:
            if line.startswith("MemAvailable:"):
                line = line.removeprefix("MemAvailable:").strip()
                return 1024 * positive_integer(line.removesuffix("kB"))
    raise RuntimeError("no MemAvailable line found in /proc/meminfo")


def guess_cgroup_memory() -> int | None:
    """Return the smalles "memory.high" or "memory.max" limit of the current
    cgroup or any parent of it if any.
    """
    guess: int | None = None
    mygroup = pathlib.PurePath(
        pathlib.Path("/proc/self/cgroup")
        .read_text(encoding=sys.getfilesystemencoding())
        .strip()
        .split(":", 2)[2]
    ).relative_to("/")
    sfc = pathlib.Path("/sys/fs/cgroup")
    for group in itertools.chain((mygroup,), mygroup.parents):
        for entry in ("memory.max", "memory.high"):
            try:
                value = positive_integer(
                    (sfc / group / entry).read_text(encoding="ascii")
                )
            except (FileNotFoundError, ValueError):
                pass
            else:
                if guess is None:
                    guess = value
                else:
                    guess = min(guess, value)
    return guess


def parse_required_memory(expression: str) -> list[int]:
    """Parse comma-separated list of memory expressions. Empty expressions copy
    the previous entry.

    >>> parse_required_memory("1k,9,,1")
    [1024, 9, 9, 1]
    """
    values: list[int] = []
    for memexpr in expression.split(","):
        if not memexpr:
            if values:
                values.append(values[-1])
            else:
                raise ValueError("initial memory expression cannot be empty")
        else:
            values.append(parse_size(memexpr))
    return values


def guess_memory_concurrency(memory: int, usage: list[int]) -> int:
    """Estimate the maximum number of cores that can be used given the
    available memory and a sequence of per-core memory consumption.

    >>> guess_memory_concurrency(4, [1])
    4
    >>> guess_memory_concurrency(10, [5, 4, 3])
    2
    >>> guess_memory_concurrency(2, [3])
    1
    """
    concurrency = 0
    for use in usage[:-1]:
        if use > memory:
            break
        memory -= use
        concurrency += 1
    else:
        concurrency += memory // usage[-1]
    return max(1, concurrency)


def clamp(
    value: int, lower: int | None = None, upper: int | None = None
) -> int:
    """Return an adjusted value that does not exceed the lower or upper limits
    if any.

    >>> clamp(5, upper=4)
    4
    >>> clamp(9, 2)
    9
    """
    if upper is not None and upper < value:
        value = upper
    if lower is not None and lower > value:
        value = lower
    return value


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--detect",
        action="store",
        default="nproc",
        metavar="METHOD",
        help="supply a processor count or select a detection method"
        "(nproc, python or cores)",
    )
    parser.add_argument(
        "--max",
        action="store",
        type=positive_integer,
        default=None,
        metavar="N",
        help="limit the number of detected cores to a given maximum",
    )
    parser.add_argument(
        "--min",
        action="store",
        type=positive_integer,
        default=None,
        metavar="N",
        help="limit the number of detected cores to a given minimum",
    )
    parser.add_argument(
        "--require-mem",
        action="store",
        type=parse_required_memory,
        default=[],
        metavar="MEMLIST",
        help="specify per-core required memory as a comma separated list",
    )
    args = parser.parse_args()
    if args.min is not None and args.max is not None and args.min > args.max:
        parser.error("--min value larger than --max value")

    guess = None
    detectfunc = None
    for detector in args.detect.split(","):
        try:
            guess = positive_integer(detector)
        except ValueError:
            try:
                detectfunc = {
                    "nproc": guess_nproc,
                    "python": guess_python,
                    "cores": guess_cores,
                }[detector]
            except KeyError:
                parser.error("invalid argument to --detect")
    if guess is None:
        assert detectfunc is not None
        guess = (
            guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL")
            or guess_deb_build_parallel()
            or guess_from_environment("RPM_BUILD_NCPUS")
            or detectfunc()
        )
    if guess is None:
        print("failed to guess processor count", file=sys.stderr)
        sys.exit(1)
    if args.require_mem and guess > 1:
        memory = clamp(guess_memavailable(), upper=guess_cgroup_memory())
        guess = clamp(
            guess, upper=guess_memory_concurrency(memory, args.require_mem)
        )
    guess = clamp(guess, args.min, args.max)
    print(guess)


if __name__ == "__main__":
    main()

Reply via email to