This patch adds the bpf-vmtest-tool subdirectory under contrib which tests
BPF programs under a live kernel using a QEMU VM.  It automatically
builds the specified kernel version with eBPF support enabled
and stores it under "~/.bpf-vmtest-tool", which is reused for future
invocations.

It can also compile BPF C source files or BPF bytecode objects and
test them against the kernel verifier for errors.  When a BPF program
is rejected by the kernel verifier, the verifier logs are displayed.

$ python3 main.py -k 6.15 --bpf-src assets/ebpf-programs/fail.c
BPF program failed to load
Verifier logs:
        btf_vmlinux is malformed
        0: R1=ctx() R10=fp0
        0: (81) r0 = *(s32 *)(r10 +4)
        invalid read from stack R10 off=4 size=4
        processed 1 insns (limit 1000000) max_states_per_insn 0 total_states 0 
peak_states 0 mark_read 0

See the README for more examples.

The script uses vmtest (https://github.com/danobi/vmtest) to boot
the VM and run the program.  By default, it uses the host's root
("/") as the VM rootfs via the 9p filesystem, so only the kernel is
replaced during testing.

Tested with Python 3.9 and above.

contrib/ChangeLog:

        * bpf-vmtest-tool/.gitignore: New file.
        * bpf-vmtest-tool/README: New file.
        * bpf-vmtest-tool/bpf.py: New file.
        * bpf-vmtest-tool/config.py: New file.
        * bpf-vmtest-tool/kernel.py: New file.
        * bpf-vmtest-tool/main.py: New file.
        * bpf-vmtest-tool/pyproject.toml: New file.
        * bpf-vmtest-tool/tests/test_cli.py: New file.
        * bpf-vmtest-tool/utils.py: New file.
        * bpf-vmtest-tool/vm.py: New file.

Signed-off-by: Piyush Raj <piyushraj92...@gmail.com>
---
 contrib/bpf-vmtest-tool/.gitignore        |  23 +++
 contrib/bpf-vmtest-tool/README            |  78 ++++++++
 contrib/bpf-vmtest-tool/bpf.py            | 189 +++++++++++++++++++
 contrib/bpf-vmtest-tool/config.py         |  18 ++
 contrib/bpf-vmtest-tool/kernel.py         | 209 ++++++++++++++++++++++
 contrib/bpf-vmtest-tool/main.py           | 101 +++++++++++
 contrib/bpf-vmtest-tool/pyproject.toml    |  36 ++++
 contrib/bpf-vmtest-tool/tests/test_cli.py | 167 +++++++++++++++++
 contrib/bpf-vmtest-tool/utils.py          |  27 +++
 contrib/bpf-vmtest-tool/vm.py             | 154 ++++++++++++++++
 10 files changed, 1002 insertions(+)
 create mode 100644 contrib/bpf-vmtest-tool/.gitignore
 create mode 100644 contrib/bpf-vmtest-tool/README
 create mode 100644 contrib/bpf-vmtest-tool/bpf.py
 create mode 100644 contrib/bpf-vmtest-tool/config.py
 create mode 100644 contrib/bpf-vmtest-tool/kernel.py
 create mode 100644 contrib/bpf-vmtest-tool/main.py
 create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml
 create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py
 create mode 100644 contrib/bpf-vmtest-tool/utils.py
 create mode 100644 contrib/bpf-vmtest-tool/vm.py

diff --git a/contrib/bpf-vmtest-tool/.gitignore 
b/contrib/bpf-vmtest-tool/.gitignore
new file mode 100644
index 00000000000..723dfe1d0f4
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/.gitignore
@@ -0,0 +1,23 @@
+.gitignore_local
+.python-version
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[codz]
+*$py.class
+
+# Unit test / coverage reports
+.pytest_cache/
+
+
+# Environments
+.env
+.envrc
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Ruff stuff:
+.ruff_cache/
diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README
new file mode 100644
index 00000000000..52e1e1b9253
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/README
@@ -0,0 +1,78 @@
+This directory contains a Python script to run BPF programs or shell commands
+under a live Linux kernel using QEMU virtual machines.
+
+USAGE
+=====
+
+To run a shell command inside a live kernel VM:
+
+    python main.py -k 6.15 -r / -c "uname -a"
+
+To run a BPF source file in the VM:
+
+    python main.py -k 6.15  --bpf-src fail.c
+
+To run a precompiled BPF object file:
+
+    python main.py -k 6.15 --bpf-obj fail.bpf.o
+
+The tool will download and build the specified kernel version from:
+
+    https://www.kernel.org/pub/linux/kernel
+
+A prebuilt `bzImage` can be supplied using the `--kernel-image` flag.
+
+NOTE
+====
+- Only x86_64 is supported
+- Only "/" (the root filesystem) is currently supported as the VM rootfs when
+running or testing BPF programs using `--bpf-src` or `--bpf-obj`.
+
+DEPENDENCIES
+============
+
+- Python >= 3.9
+- vmtest >= v0.18.0 (https://github.com/danobi/vmtest)
+    - QEMU
+    - qemu-guest-agent
+
+For compiling kernel
+- https://docs.kernel.org/process/changes.html#current-minimal-requirements
+For compiling and loading BPF programs:
+
+- libbpf
+- bpftool
+- gcc-bpf-unknown-none
+       (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF)
+- vmlinux.h
+    Can be generated using:
+
+    bpftool btf dump file /sys/kernel/btf/vmlinux format c > \
+    /usr/local/include/vmlinux.h
+
+    Or downloaded from https://github.com/libbpf/vmlinux.h/tree/main
+
+BUILD FLAGS
+===========
+You can customize compiler settings using environment variables:
+
+- BPF_CC:       Compiler for the BPF program (default: bpf-unknown-none-gcc)
+- BPF_CFLAGS:   Extra flags for BPF program compilation (default: "-O2")
+- BPF_INCLUDES: Include paths for BPF (default: -I/usr/local/include 
-I/usr/include)
+- CC:           Compiler for the user-space loader (default: gcc)
+- CFLAGS:       Flags for compiling the loader (default: -g -Wall)
+- LDFLAGS:      Linker flags for the loader (default: -lelf -lz -lbpf)
+
+Example usage:
+
+    BPF_CFLAGS="-O3 -g" CFLAGS="-O2" python main.py -k 6.15  --bpf-src fail.c
+
+DEVELOPMENT
+===========
+
+Development dependencies are specified in `pyproject.toml`, which can be used
+with any suitable Python virtual environment manager.
+
+To run the test suite:
+
+    python3 -m pytest
diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py
new file mode 100644
index 00000000000..cc4ea3f3bf9
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/bpf.py
@@ -0,0 +1,189 @@
+import re
+import subprocess
+import logging
+from pathlib import Path
+import tempfile
+from typing import Optional
+import utils
+from config import BPF_CC, BPF_CFLAGS, BPF_INCLUDES, LDFLAGS, CC, CFLAGS,ARCH
+
+logger = logging.getLogger(__name__)
+
+# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile
+
+
+def generate_sanitized_name(path: Path):
+    """generate sanitized c variable name"""
+    name = re.sub(r"\W", "_", path.stem)
+    if name and name[0].isdigit():
+        name = "_" + name
+    return name
+
+
+class BPFProgram:
+    tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest")
+    tmp_base_dir_path = Path(tmp_base_dir.name)
+
+    def __init__(
+        self,
+        source_path: Optional[Path] = None,
+        bpf_bytecode_path: Optional[Path] = None,
+        use_temp_dir: bool = False,
+    ):
+        path = source_path or bpf_bytecode_path
+        self.name = generate_sanitized_name(path)
+        self.build_dir = self.__class__.tmp_base_dir_path / "ebpf_programs" / 
self.name
+
+        if source_path:
+            self.bpf_src = source_path
+            self.bpf_obj = self.build_dir / f"{self.name}.bpf.o"
+        else:
+            self.bpf_obj = bpf_bytecode_path
+        self.build_dir.mkdir(parents=True, exist_ok=True)
+        self.bpf_skel = self.build_dir / f"{self.name}.skel.h"
+        self.loader_src = self.build_dir / f"{self.name}-loader.c"
+        self.output = self.build_dir / f"{self.name}.o"
+
+    @classmethod
+    def from_source(cls, source_path: Path):
+        self = cls(source_path=source_path)
+        self._compile_bpf()
+        self._compile_from_bpf_bytecode()
+        return self.output
+
+    @classmethod
+    def from_bpf_obj(cls, obj_path: Path):
+        self = cls(bpf_bytecode_path=obj_path)
+        self._compile_from_bpf_bytecode()
+        return self.output
+
+    def _compile_from_bpf_bytecode(self):
+        self._generate_skeleton()
+        self._compile_loader()
+
+    def _compile_bpf(self):
+        """Compile the eBPF program using gcc"""
+        logger.info(f"Compiling eBPF source: {self.bpf_src}")
+        cmd = [
+            BPF_CC,
+            f"-D__TARGET_ARCH_{ARCH}",
+            "-gbtf",
+            "-std=gnu17"]
+        if BPF_CFLAGS != "":
+            cmd.extend(BPF_CFLAGS.split(" "))
+        cmd.extend(BPF_INCLUDES.split())
+        cmd.extend([
+            "-c",
+            str(self.bpf_src),
+            "-o",
+            str(self.bpf_obj),
+        ])
+        logger.debug("".join(cmd))
+        utils.run_command(cmd)
+        logger.info(f"eBPF compiled: {self.bpf_obj}")
+
+    def _generate_skeleton(self):
+        """Generate the BPF skeleton header using bpftool"""
+        logger.info(f"Generating skeleton: {self.bpf_skel}")
+        cmd = ["bpftool", "gen", "skeleton", str(self.bpf_obj), "name", 
self.name]
+        try:
+            result = utils.run_command(cmd)
+            with open(self.bpf_skel, "w") as f:
+                f.write(result.stdout)
+            logger.info("Skeleton generated.")
+            logger.debug("bpftool output:\n%s", result.stdout)
+            if result.stderr:
+                logger.debug("bpftool output:\n%s", result.stderr)
+        except subprocess.CalledProcessError as e:
+            logger.error("Failed to generate skeleton.")
+            logger.error("stdout:\n%s", e.stdout)
+            logger.error("stderr:\n%s", e.stderr)
+            raise
+
+    def _compile_loader(self):
+        """Compile the C loader program"""
+        self.generate_loader()
+        logger.info(f"Compiling loader: {self.loader_src}")
+        cmd = [
+            CC,
+            *CFLAGS.split(" "),
+            "-I",
+            str(self.build_dir),
+            str(self.loader_src),
+            *LDFLAGS.split(" "),
+            "-o",
+            str(self.output),
+        ]
+        utils.run_command(cmd)
+        logger.info("Compilation complete")
+
+    def generate_loader(self):
+        """
+        Generate a loader C file for the given BPF skeleton.
+
+        Args:
+            bpf_name (str): Name of the BPF program (e.g. "prog").
+            output_path (str): Path to write loader.c.
+        """
+        skeleton_header = f"{self.name}.skel.h"
+        loader_code = f"""\
+        #include <stdio.h>
+        #include <stdlib.h>
+        #include <signal.h>
+        #include <unistd.h>
+        #include <bpf/libbpf.h>
+        #include "{skeleton_header}"
+
+        #define LOG_BUF_SIZE 1024 * 1024
+
+        static volatile sig_atomic_t stop;
+        static char log_buf[LOG_BUF_SIZE];
+
+        void handle_sigint(int sig) {{
+            stop = 1;
+        }}
+
+        int main() {{
+            struct {self.name} *skel;
+            struct bpf_program *prog;
+            int err;
+
+            signal(SIGINT, handle_sigint);
+
+            skel = {self.name}__open();  // STEP 1: open only
+            if (!skel) {{
+                fprintf(stderr, "Failed to open BPF skeleton\\n");
+                return 1;
+            }}
+
+            // STEP 2: Get the bpf_program object for the main program
+            bpf_object__for_each_program(prog, skel->obj) {{
+                bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf));
+                bpf_program__set_log_level(prog, 1); // optional: verbose logs
+            }}
+
+            // STEP 3: Load the program (this will trigger verifier log output)
+            err = {self.name}__load(skel);
+            fprintf(
+               stderr,
+               "--- Verifier log start ---\\n"
+               "%s\\n"
+               "--- Verifier log end ---\\n",
+               log_buf
+               );
+            if (err) {{
+                fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err);
+                {self.name}__destroy(skel);
+                return 1;
+            }}
+
+            printf("BPF program loaded successfully.\\n");
+
+            {self.name}__destroy(skel);
+            return 0;
+        }}
+
+        """
+        with open(self.loader_src, "w") as f:
+            f.write(loader_code)
+        logger.info(f"Generated loader at {self.loader_src}")
diff --git a/contrib/bpf-vmtest-tool/config.py 
b/contrib/bpf-vmtest-tool/config.py
new file mode 100644
index 00000000000..e758d52a97f
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/config.py
@@ -0,0 +1,18 @@
+import platform
+from pathlib import Path
+import os
+
+KERNEL_TARBALL_PREFIX_URL = "https://cdn.kernel.org/pub/linux/kernel/";
+BASE_DIR = Path.home() / ".bpf-vmtest-tool"
+ARCH = platform.machine()
+KCONFIG_REL_PATHS = [
+    "tools/testing/selftests/bpf/config",
+    "tools/testing/selftests/bpf/config.vm",
+    f"tools/testing/selftests/bpf/config.{ARCH}",
+]
+CC = os.getenv("CC", "gcc")
+CFLAGS = os.getenv("CFLAGS", "-g -Wall")
+LDFLAGS = os.getenv("LDFLAGS", "-lelf -lz -lbpf")
+BPF_CC = os.getenv("BPF_CC", "bpf-unknown-none-gcc")
+BPF_CFLAGS = os.getenv("BPF_CFLAGS", "-O2")
+BPF_INCLUDES = os.getenv("BPF_INCLUDES", "-I/usr/local/include -I/usr/include")
diff --git a/contrib/bpf-vmtest-tool/kernel.py 
b/contrib/bpf-vmtest-tool/kernel.py
new file mode 100644
index 00000000000..2974948130e
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/kernel.py
@@ -0,0 +1,209 @@
+import logging
+import os
+import shutil
+import subprocess
+from pathlib import Path
+import re
+from urllib.parse import urljoin
+from urllib.request import urlretrieve
+from typing import Optional, List
+from dataclasses import dataclass
+
+from config import ARCH, BASE_DIR, KCONFIG_REL_PATHS, KERNEL_TARBALL_PREFIX_URL
+import utils
+
+logger = logging.getLogger(__name__)
+KERNELS_DIR = BASE_DIR / "kernels"
+
+
+@dataclass
+class KernelSpec:
+    """Immutable kernel specification"""
+
+    version: str
+    arch: str = ARCH
+
+    def __post_init__(self):
+        self.major = self.version.split(".")[0]
+
+    def __str__(self):
+        return f"{self.version}-{self.arch}"
+
+    @property
+    def bzimage_path(self) -> Path:
+        return KERNELS_DIR / f"bzImage-{self}"
+
+    @property
+    def tarball_path(self) -> Path:
+        return KERNELS_DIR / f"linux-{self.version}.tar.xz"
+
+    @property
+    def kernel_dir(self) -> Path:
+        return KERNELS_DIR / f"linux-{self.version}"
+
+
+class KernelImage:
+    """Represents a compiled kernel image"""
+
+    def __init__(self, path: Path):
+        if not isinstance(path, Path):
+            path = Path(path)
+
+        if not path.exists():
+            raise FileNotFoundError(f"Kernel image not found: {path}")
+
+        self.path = path
+
+    def __str__(self):
+        return str(self.path)
+
+
+class KernelCompiler:
+    """Handles complete kernel compilation process including download and 
build"""
+
+    def compile_from_version(self, spec: KernelSpec) -> KernelImage:
+        """Complete compilation process from kernel version"""
+        if spec.bzimage_path.exists():
+            logger.info(f"Kernel {spec} already exists, skipping compilation")
+            return KernelImage(spec.bzimage_path)
+
+        try:
+            self._download_source(spec)
+            self._extract_source(spec)
+            self._configure_kernel(spec)
+            self._compile_kernel(spec)
+            self._copy_bzimage(spec)
+
+            logger.info(f"Successfully compiled kernel {spec}")
+            return KernelImage(spec.bzimage_path)
+
+        except Exception as e:
+            logger.error(f"Failed to compile kernel {spec}: {e}")
+            raise
+        finally:
+            # Always cleanup temporary files
+            self._cleanup(spec)
+
+    def _download_source(self, spec: KernelSpec) -> None:
+        """Download kernel source tarball"""
+        if spec.tarball_path.exists():
+            logger.info(f"Tarball already exists: {spec.tarball_path}")
+            return
+
+        url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz"
+        url = urljoin(KERNEL_TARBALL_PREFIX_URL, url_suffix)
+
+        logger.info(f"Downloading kernel from {url}")
+        spec.tarball_path.parent.mkdir(parents=True, exist_ok=True)
+        urlretrieve(url, spec.tarball_path)
+        logger.info("Kernel source downloaded")
+
+    def _extract_source(self, spec: KernelSpec) -> None:
+        """Extract kernel source tarball"""
+        logger.info(f"Extracting kernel source to {spec.kernel_dir}")
+        spec.kernel_dir.mkdir(parents=True, exist_ok=True)
+
+        utils.run_command(
+            [
+                "tar",
+                "-xf",
+                str(spec.tarball_path),
+                "-C",
+                str(spec.kernel_dir),
+                "--strip-components=1",
+            ]
+        )
+
+    def _configure_kernel(self, spec: KernelSpec) -> None:
+        """Configure kernel with provided config files"""
+        config_path = spec.kernel_dir / ".config"
+
+        with open(config_path, "wb") as kconfig:
+            for config_rel_path in KCONFIG_REL_PATHS:
+                config_abs_path = spec.kernel_dir / config_rel_path
+                if config_abs_path.exists():
+                    with open(config_abs_path, "rb") as conf:
+                        kconfig.write(conf.read())
+
+        logger.info("Updated kernel configuration")
+
+    def _compile_kernel(self, spec: KernelSpec) -> None:
+        """Compile the kernel"""
+        logger.info(f"Compiling kernel in {spec.kernel_dir}")
+        old_cwd = os.getcwd()
+
+        try:
+            os.chdir(spec.kernel_dir)
+            utils.run_command(["make", "olddefconfig"])
+            utils.run_command(["make", f"-j{os.cpu_count()}", "bzImage"])
+        except subprocess.CalledProcessError as e:
+            logger.error(f"Kernel compilation failed: {e}")
+            raise
+        finally:
+            os.chdir(old_cwd)
+
+    def _copy_bzimage(self, spec: KernelSpec) -> None:
+        """Copy compiled bzImage to final location"""
+        src = spec.kernel_dir / "arch/x86/boot/bzImage"
+        dest = spec.bzimage_path
+        dest.parent.mkdir(parents=True, exist_ok=True)
+
+        shutil.copy2(src, dest)
+        logger.info(f"Stored bzImage at {dest}")
+
+    def _cleanup(self, spec: KernelSpec) -> None:
+        """Clean up temporary files"""
+        if spec.tarball_path.exists():
+            spec.tarball_path.unlink()
+            logger.info("Removed tarball")
+
+        if spec.kernel_dir.exists():
+            shutil.rmtree(spec.kernel_dir)
+            logger.info("Removed kernel source directory")
+
+
+class KernelManager:
+    """Main interface for kernel management"""
+
+    def __init__(self):
+        self.compiler = KernelCompiler()
+
+    def get_kernel_image(
+        self,
+        version: Optional[str] = None,
+        kernel_image_path: Optional[str] = None,
+        arch: str = ARCH,
+    ) -> KernelImage:
+        """Get kernel image from version or existing file"""
+
+        # Validate inputs
+        if not version and not kernel_image_path:
+            raise ValueError("Must provide either 'version' or 
'kernel_image_path'")
+
+        if version and kernel_image_path:
+            raise ValueError("Provide only one of 'version' or 
'kernel_image_path'")
+
+        # Handle existing kernel image
+        if kernel_image_path:
+            path = Path(kernel_image_path)
+            if not path.exists():
+                raise FileNotFoundError(f"Kernel image not found: 
{kernel_image_path}")
+            return KernelImage(path)
+
+        # Handle version-based compilation
+        if version:
+            spec = KernelSpec(version=version, arch=arch)
+            return self.compiler.compile_from_version(spec)
+
+    def list_available_kernels(self) -> List[str]:
+        """List all available compiled kernels"""
+        if not KERNELS_DIR.exists():
+            return []
+
+        kernels = []
+        for file in KERNELS_DIR.glob("bzImage-*"):
+            match = re.match(r"bzImage-(.*)", file.name)
+            if match:
+                kernels.append(match.group(1))
+
+        return sorted(kernels)
diff --git a/contrib/bpf-vmtest-tool/main.py b/contrib/bpf-vmtest-tool/main.py
new file mode 100644
index 00000000000..bd408badef1
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/main.py
@@ -0,0 +1,101 @@
+import argparse
+import logging
+from pathlib import Path
+import textwrap
+
+import bpf
+import kernel
+import vm
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    kernel_group = parser.add_mutually_exclusive_group(required=True)
+    kernel_group.add_argument(
+        "-k",
+        "--kernel",
+        help="Kernel version to boot in the vm",
+        metavar="VERSION",
+        type=str,
+    )
+    kernel_group.add_argument(
+        "--kernel-image",
+        help="Kernel image to boot in the vm",
+        metavar="PATH",
+        type=str,
+    )
+    parser.add_argument(
+        "-r", "--rootfs", help="rootfs to mount in the vm", default="/", 
metavar="PATH"
+    )
+    parser.add_argument(
+        "-v",
+        "--log-level",
+        help="Log level",
+        metavar="DEBUG|INFO|WARNING|ERROR",
+        choices=["DEBUG", "INFO", "WARNING", "ERROR"],
+        default="ERROR",
+    )
+    command_group = parser.add_mutually_exclusive_group(required=True)
+    command_group.add_argument(
+        "--bpf-src",
+        help="Path to BPF C source file",
+        metavar="PATH",
+        type=str,
+    )
+    command_group.add_argument(
+        "--bpf-obj",
+        help="Path to bpf bytecode object",
+        metavar="PATH",
+        type=str,
+    )
+    command_group.add_argument(
+        "-c", "--command", help="command to run in the vm", metavar="COMMAND"
+    )
+    command_group.add_argument(
+        "-s", "--shell", help="open interactive shell in the vm", 
action="store_true"
+    )
+
+    args = parser.parse_args()
+
+    logging.basicConfig(level=args.log_level)
+    logger = logging.getLogger(__name__)
+    kmanager = kernel.KernelManager()
+
+    if args.kernel:
+        kernel_image = kmanager.get_kernel_image(version=args.kernel)
+    elif args.kernel_image:
+        kernel_image = 
kmanager.get_kernel_image(kernel_image_path=args.kernel_image)
+
+    if args.bpf_src:
+        command = bpf.BPFProgram.from_source(Path(args.bpf_src))
+    elif args.bpf_obj:
+        command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj))
+    elif args.command:
+        command = args.command
+    elif args.shell:
+        # todo: somehow pass to hyperwiser that you need to attach stdin as 
well
+        # command = "/bin/bash"
+        raise NotImplementedError
+
+    virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, 
str(command))
+    try:
+        result = virtual_machine.execute()
+    except vm.BootFailedError as e:
+        logger.error("VM boot failure: execution aborted. See logs for 
details.")
+        print(e)
+        exit(e.returncode)
+    if args.bpf_src or args.bpf_obj:
+        if result.returncode == 0:
+            print("BPF programs succesfully loaded")
+        else:
+            if "Failed to load BPF skeleton" in result.stdout:
+                print("BPF program failed to load")
+                print("Verifier logs:")
+                print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), 
"\t"))
+    elif args.command:
+        print(result.stdout)
+    exit(result.returncode)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/contrib/bpf-vmtest-tool/pyproject.toml 
b/contrib/bpf-vmtest-tool/pyproject.toml
new file mode 100644
index 00000000000..1977612cfd6
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/pyproject.toml
@@ -0,0 +1,36 @@
+[project]
+name = "bpf-vmtest-tool"
+version = "0.1.0"
+description = "Test BPF code against live kernels"
+readme = "README.md"
+requires-python = ">=3.9"
+dependencies = []
+
+[dependency-groups]
+dev = [
+    "pre-commit>=4.2.0",
+    "pytest>=8.4.0",
+    "pytest-sugar>=1.0.0",
+    "ruff>=0.11.13",
+    "tox>=4.26.0",
+]
+
+[tool.pytest.ini_options]
+addopts = [
+    "--import-mode=importlib",
+]
+testpaths = ["tests"]
+pythonpath = ["."]
+
+[tool.ruff.lint]
+select = [
+    # pycodestyle
+    "E",
+    # Pyflakes
+    "F",
+]
+# Allow fix for all enabled rules (when `--fix`) is provided.
+fixable = ["ALL"]
+
+# Allow unused variables when underscore-prefixed.
+dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
\ No newline at end of file
diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py 
b/contrib/bpf-vmtest-tool/tests/test_cli.py
new file mode 100644
index 00000000000..53df3ca3663
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/tests/test_cli.py
@@ -0,0 +1,167 @@
+import sys
+from unittest import mock
+import pytest
+from bpf import BPFProgram
+import kernel
+import main
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture
+def openat_bpf_source(tmp_path):
+    openat_bpf = tmp_path / "openat_bpf.c"
+    openat_bpf.write_text(r"""
+    #include "vmlinux.h"
+    #include <bpf/bpf_helpers.h>
+    #include <bpf/bpf_tracing.h>
+    #include <bpf/bpf_core_read.h>
+
+    char LICENSE[] SEC("license") = "GPL";
+
+    int example_pid = 0;
+
+    SEC("tracepoint/syscalls/sys_enter_openat")
+    int handle_openat(struct trace_event_raw_sys_enter *ctx)
+    {
+        int pid = bpf_get_current_pid_tgid() >> 32;
+        char filename[256]; // filename buffer
+        bpf_probe_read_user(&filename, sizeof(filename), (void *)ctx->args[1]);
+        bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", pid,
+            filename);
+
+        return 0;
+    }
+
+    """)
+    return openat_bpf
+
+
+@pytest.fixture
+def openat_bpf_obj(openat_bpf_source):
+    bpf_program = BPFProgram(source_path=openat_bpf_source)
+    bpf_program._compile_bpf()
+    return bpf_program.bpf_obj
+
+
+@pytest.fixture
+def invalid_memory_access_bpf_source(tmp_path):
+    invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c"
+    invalid_memory_access_bpf.write_text(r"""
+    #include "vmlinux.h"
+    #include <bpf/bpf_helpers.h>
+    #include <bpf/bpf_tracing.h>
+
+    char LICENSE[] SEC("license") = "GPL";
+
+    SEC("tracepoint/syscalls/sys_enter_openat")
+    int bpf_prog(struct trace_event_raw_sys_enter *ctx) {
+        int arr[4] = {1, 2, 3, 4};
+
+        // Invalid memory access: out-of-bounds
+        int val = arr[5];  // This causes the verifier to fail
+
+        return val;
+    }
+    """)
+    return invalid_memory_access_bpf
+
+
+@pytest.fixture
+def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source):
+    bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source)
+    bpf_program._compile_bpf()
+    return bpf_program.bpf_obj
+
+
+def run_main_with_args_and_capture_output(args, capsys):
+    with mock.patch.object(sys, "argv", args):
+        try:
+            main.main()
+        except SystemExit as e:
+            result = capsys.readouterr()
+            output = result.out.rstrip()
+            error = result.err.rstrip()
+            logger.debug("STDOUT:\n%s", output)
+            logger.debug("STDERR:\n%s", error)
+            return e.code, output, error
+
+
+kernel_image_path = kernel.KernelManager().get_kernel_image(version="6.15")
+kernel_cli_flags = [["--kernel", "6.15"], ["--kernel-image", 
f"{kernel_image_path}"]]
+
+
+@pytest.mark.parametrize("kernel_args", kernel_cli_flags)
+class TestCLI:
+    def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, capsys):
+        args = [
+            "main.py",
+            *kernel_args,
+            "--rootfs",
+            "/",
+            "--bpf-src",
+            str(openat_bpf_source),
+        ]
+        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+        assert code == 0
+        assert "BPF programs succesfully loaded" == output
+
+    def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, 
capsys):
+        args = [
+            "main.py",
+            *kernel_args,
+            "--rootfs",
+            "/",
+            "--bpf-obj",
+            str(openat_bpf_obj),
+        ]
+        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+        assert code == 0
+        assert "BPF programs succesfully loaded" == output
+
+    def test_main_with_invalid_bpf(
+        self, kernel_args, invalid_memory_access_bpf_source, capsys
+    ):
+        args = [
+            "main.py",
+            *kernel_args,
+            "--rootfs",
+            "/",
+            "--bpf-src",
+            str(invalid_memory_access_bpf_source),
+        ]
+        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+        output_lines = output.splitlines()
+        assert code == 1
+        assert "BPF program failed to load" == output_lines[0]
+        assert "Verifier logs:" == output_lines[1]
+
+    def test_main_with_invalid_bpf_obj(
+        self, kernel_args, invalid_memory_access_bpf_obj, capsys
+    ):
+        args = [
+            "main.py",
+            *kernel_args,
+            "--rootfs",
+            "/",
+            "--bpf-obj",
+            str(invalid_memory_access_bpf_obj),
+        ]
+        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+        output_lines = output.splitlines()
+        assert code == 1
+        assert "BPF program failed to load" == output_lines[0]
+        assert "Verifier logs:" == output_lines[1]
+
+    def test_main_with_valid_command(self, kernel_args, capsys):
+        args = ["main.py", *kernel_args, "--rootfs", "/", "-c", "uname -r"]
+        code, output, error = run_main_with_args_and_capture_output(args, 
capsys)
+        assert code == 0
+        assert "6.15.0" == output
+
+    def test_main_with_invalid_command(self, kernel_args, capsys):
+        args = ["main.py", *kernel_args, "--rootfs", "/", "-c", 
"NotImplemented"]
+        code, output, error = run_main_with_args_and_capture_output(args, 
capsys)
+        assert code != 0
+        assert f"Command failed with exit code: {code}" in output
diff --git a/contrib/bpf-vmtest-tool/utils.py b/contrib/bpf-vmtest-tool/utils.py
new file mode 100644
index 00000000000..ef911d8c851
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/utils.py
@@ -0,0 +1,27 @@
+import subprocess
+import logging
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+
+def run_command(cmd:list[str], **kwargs: Any):
+    logger.debug(f"Running command: {' '.join(cmd)}")
+    try:
+        logger.debug(f"running command: {cmd}")
+        result = subprocess.run(
+            cmd,
+            text=True,
+            check=True,
+            capture_output=True,
+            shell=False,
+            **kwargs,
+        )
+        logger.debug("Command stdout:\n" + result.stdout.strip())
+        if result.stderr:
+            logger.debug("Command stderr:\n" + result.stderr.strip())
+        return result
+    except subprocess.CalledProcessError as e:
+        logger.error("Command failed with stdout:\n" + e.stdout.strip())
+        logger.error("Command failed with stderr:\n" + e.stderr.strip())
+        raise
diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py
new file mode 100644
index 00000000000..5d4a5747f0b
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/vm.py
@@ -0,0 +1,154 @@
+import logging
+import subprocess
+from typing import List
+
+from kernel import KernelImage
+
+logger = logging.getLogger(__name__)
+
+
+class VMConfig:
+    """Configuration container for VM settings"""
+
+    def __init__(
+        self, kernel_image: KernelImage, rootfs_path: str, command: str, 
**kwargs
+    ):
+        self.kernel = kernel_image
+        self.kernel_path = str(kernel_image.path)
+        self.rootfs_path = rootfs_path
+        self.command = command
+        self.memory_mb = kwargs.get("memory_mb", 512)
+        self.cpu_count = kwargs.get("cpu_count", 1)
+        self.extra_args = kwargs.get("extra_args", {})
+
+
+def bpf_verifier_logs(output: str) -> str:
+    start_tag = "--- Verifier log start ---"
+    end_tag = "--- Verifier log end ---"
+
+    start_idx = output.find(start_tag)
+    end_idx = output.find(end_tag)
+
+    if start_idx != -1 and end_idx != -1:
+        # Extract between the tags (excluding the markers themselves)
+        log_body = output[start_idx + len(start_tag) : end_idx].strip()
+        return log_body
+    else:
+        return "No verifier log found in the output."
+
+
+class Vmtest:
+    """vmtest backend implementation"""
+
+    def __init__(self):
+        pass
+
+    def _boot_command(self, vm_config: VMConfig):
+        vmtest_command = ["vmtest"]
+        vmtest_command.extend(["-k", vm_config.kernel_path])
+        vmtest_command.extend(["-r", vm_config.rootfs_path])
+        vmtest_command.append(vm_config.command)
+        return vmtest_command
+
+    def _remove_boot_log(self, full_output: str) -> str:
+        """
+        Filters QEMU and kernel boot logs, returning only the output after the
+        `===> Running command` marker.
+        """
+        marker = "===> Running command"
+        lines = full_output.splitlines()
+
+        try:
+            start_index = next(i for i, line in enumerate(lines) if marker in 
line)
+            # Return everything after that marker (excluding the marker itself)
+            return "\n".join(lines[start_index + 1 :]).strip()
+        except StopIteration:
+            return full_output.strip()
+
+    def run_command(self, vm_config):
+        vm = None
+        try:
+            logger.info(f"Booting VM with kernel: {vm_config.kernel_path}")
+            logger.info(f"Using rootfs: {vm_config.rootfs_path}")
+            vm = subprocess.run(
+                self._boot_command(vm_config),
+                check=True,
+                text=True,
+                capture_output=True,
+                shell=False,
+            )
+            vm_stdout = vm.stdout
+            logger.debug(vm_stdout)
+            return VMCommandResult(
+                vm.returncode, self._remove_boot_log(vm_stdout), None
+            )
+        except subprocess.CalledProcessError as e:
+            out = e.stdout
+            err = e.stderr
+            # when the command in the vm fails we consider it as a succesfull 
boot
+            if "===> Running command" not in out:
+                raise BootFailedError("Boot failed", out, err, e.returncode)
+            logger.debug("STDOUT: \n%s", out)
+            logger.debug("STDERR: \n%s", err)
+            return VMCommandResult(e.returncode, self._remove_boot_log(out), 
err)
+
+
+class VMCommandResult:
+    def __init__(self, returncode, stdout, stderr) -> None:
+        self.returncode = returncode
+        self.stdout = stdout
+        self.stderr = stderr
+
+
+class VirtualMachine:
+    """Main VM class - simple interface for end users"""
+
+    # Registry of available hypervisors
+    _hypervisors = {
+        "vmtest": Vmtest,
+    }
+
+    def __init__(
+        self,
+        kernel_image: KernelImage,
+        rootfs_path: str,
+        command: str,
+        hypervisor_type: str = "vmtest",
+        **kwargs,
+    ):
+        self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs)
+
+        if hypervisor_type not in self._hypervisors:
+            raise ValueError(f"Unsupported hypervisor: {hypervisor_type}")
+
+        self.hypervisor = self._hypervisors[hypervisor_type]()
+
+    @classmethod
+    def list_hypervisors(cls) -> List[str]:
+        """List available hypervisors"""
+        return list(cls._hypervisors.keys())
+
+    def execute(self):
+        """Execute command in VM"""
+        return self.hypervisor.run_command(self.config)
+
+
+class BootFailedError(Exception):
+    """Raised when VM fails to boot properly (before command execution)."""
+
+    def __init__(
+        self, message: str, stdout: str = "", stderr: str = "", returncode: 
int = -1
+    ):
+        super().__init__(message)
+        self.stdout = stdout
+        self.stderr = stderr
+        self.returncode = returncode
+
+    def __str__(self):
+        base = super().__str__()
+        return (
+            f"{base}\n"
+            f"Return code: {self.returncode}\n"
+            f"--- STDOUT ---\n{self.stdout}\n"
+            f"--- STDERR ---\n{self.stderr}"
+        )
-- 
2.50.0

Reply via email to