On Wed, Sep 03, 2025 at 04:37:05PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> Add test for a new feature of local TAP migration with fd passing
> through unix socket.
>
> Signed-off-by: Vladimir Sementsov-Ogievskiy <[email protected]>
> ---
> .../test_x86_64_tap_fd_migration.py | 347 ++++++++++++++++++
> 1 file changed, 347 insertions(+)
> create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py
>
> diff --git a/tests/functional/test_x86_64_tap_fd_migration.py
> b/tests/functional/test_x86_64_tap_fd_migration.py
> new file mode 100644
> index 0000000000..f6d18fe39f
> --- /dev/null
> +++ b/tests/functional/test_x86_64_tap_fd_migration.py
> @@ -0,0 +1,347 @@
> +#!/usr/bin/env python3
> +#
> +# Functional test that tests TAP local migration
> +# with fd passing
> +#
> +# Copyright (c) Yandex
> +#
> +# SPDX-License-Identifier: GPL-2.0-or-later
> +
> +import os
> +import time
> +import subprocess
> +import signal
> +from typing import Tuple
> +
> +from qemu_test import (
> + LinuxKernelTest,
> + Asset,
> + exec_command_and_wait_for_pattern,
> +)
> +
> +GUEST_IP = "10.0.1.2"
> +GUEST_IP_MASK = f"{GUEST_IP}/24"
> +GUEST_MAC = "d6:0d:75:f8:0f:b7"
> +HOST_IP = "10.0.1.1"
> +HOST_IP_MASK = f"{HOST_IP}/24"
> +TAP_ID = "tap0"
> +TAP_MAC = "e6:1d:44:b5:03:5d"
> +
> +
> +def run(cmd: str, check: bool = True) -> None:
> + subprocess.run(cmd, check=check, shell=True)
I don't see need to be using shell here - just use
"check_call()" and pass the argv as an array instead
of a string at which point this 'run' helper can be
removed.
> +
> +
> +def fetch(cmd: str, check: bool = True) -> str:
> + return subprocess.run(
> + cmd, check=check, shell=True, stdout=subprocess.PIPE, text=True
> + ).stdout
> +
> +
> +def del_tap() -> None:
> + run(f"ip tuntap del {TAP_ID} mode tap multi_queue", check=False)
> +
> +
> +def init_tap() -> None:
> + run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
> + run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
> + run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
> + run(f"ip link set {TAP_ID} up")
$ ip tuntap add dev foo mode tap multi_queue
ioctl(TUNSETIFF): Operation not permitted
The functional tests run as the developer's normal unprivileged user
account, so it doesn't look like this can work ?
Were you testing this as root ?
> +
> +
> +def parse_ping_line(line: str) -> float:
> + # suspect lines like
> + # [1748524876.590509] 64 bytes from 94.245.155.3 \
> + # (94.245.155.3): icmp_seq=1 ttl=250 time=101 ms
> + spl = line.split()
> + return float(spl[0][1:-1])
> +
> +
> +def parse_ping_output(out) -> Tuple[bool, float, float]:
> + lines = [x for x in out.split("\n") if x.startswith("[")]
> +
> + try:
> + first_no_ans = next(
> + (ind for ind in range(len(lines)) if lines[ind][20:26] == "no
> ans")
> + )
> + except StopIteration:
> + return False, parse_ping_line(lines[0]), parse_ping_line(lines[-1])
> +
> + last_no_ans = next(
> + (ind for ind in range(len(lines) - 1, -1, -1) if lines[ind][20:26]
> == "no ans")
> + )
> +
> + return (
> + True,
> + parse_ping_line(lines[first_no_ans]),
> + parse_ping_line(lines[last_no_ans]),
> + )
> +
> +
> +def wait_migration_finish(source_vm, target_vm):
> + migr_events = (
> + ("MIGRATION", {"data": {"status": "completed"}}),
> + ("MIGRATION", {"data": {"status": "failed"}}),
> + )
> +
> + source_e = source_vm.events_wait(migr_events)["data"]
> + target_e = target_vm.events_wait(migr_events)["data"]
> +
> + source_s = source_vm.cmd("query-status")["status"]
> + target_s = target_vm.cmd("query-status")["status"]
> +
> + assert (
> + source_e["status"] == "completed"
> + and target_e["status"] == "completed"
> + and source_s == "postmigrate"
> + and target_s == "paused"
> + ), f"""Migration failed:
> + SRC status: {source_s}
> + SRC event: {source_e}
> + TGT status: {target_s}
> + TGT event:{target_e}"""
> +
> +
> +class VhostUserBlkFdMigration(LinuxKernelTest):
> +
> + ASSET_KERNEL = Asset(
> + (
> +
> "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
> + "/31/Server/x86_64/os/images/pxeboot/vmlinuz"
> + ),
> + "d4738d03dbbe083ca610d0821d0a8f1488bebbdccef54ce33e3adb35fda00129",
> + )
> +
> + ASSET_INITRD = Asset(
> + (
> +
> "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
> + "/31/Server/x86_64/os/images/pxeboot/initrd.img"
> + ),
> + "277cd6c7adf77c7e63d73bbb2cded8ef9e2d3a2f100000e92ff1f8396513cd8b",
> + )
> +
> + ASSET_ALPINE_ISO = Asset(
> + (
> + "https://dl-cdn.alpinelinux.org/"
> + "alpine/v3.22/releases/x86_64/alpine-standard-3.22.1-x86_64.iso"
> + ),
> + "96d1b44ea1b8a5a884f193526d92edb4676054e9fa903ad2f016441a0fe13089",
> + )
> +
> + def setUp(self):
> + super().setUp()
> +
> + init_tap()
> +
> + self.outer_ping_proc = None
> +
> + def tearDown(self):
> + del_tap()
> +
> + if self.outer_ping_proc:
> + self.stop_outer_ping()
> +
> + super().tearDown()
> +
> + def start_outer_ping(self) -> None:
> + assert self.outer_ping_proc is None
> + self.outer_ping_log = open("/tmp/ping.log", "w")
> + self.outer_ping_proc = subprocess.Popen(
> + ["ping", "-i", "0", "-O", "-D", GUEST_IP],
> + text=True,
> + stdout=self.outer_ping_log,
> + )
Surely outer_ping_log can be closed immediately as the child
process will keep the FD open ?
> +
> + def stop_outer_ping(self) -> str:
> + assert self.outer_ping_proc
> + self.outer_ping_proc.send_signal(signal.SIGINT)
> +
> + self.outer_ping_proc.communicate(timeout=5)
> + self.outer_ping_proc = None
> + self.outer_ping_log.close()
> +
> + # We need the start, the end and several lines around "no answer"
> + cmd = "cat /tmp/ping.log | grep -A 4 -B 4 'PING\\|packets\\|no ans'"
> + return fetch(cmd)
IMHO this can just read the whole of /tmp/ping.log directly into memory
and then the parse_ping_output can jjust match on it with a regex,
avoiding the pre-processing with grep.
> +
> + def stop_ping_and_check(self, stop_time, resume_time):
> + ping_res = self.stop_outer_ping()
> +
> + discon, a, b = parse_ping_output(ping_res)
> +
> + if not discon:
> + text = f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: {a}
> - {b}"
> + if a > stop_time or b < resume_time:
> + self.fail(f"PING failed: {text}")
> + self.log.info(f"PING: no packets lost: {text}")
> + return
> +
> + text = (
> + f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: disconnect:
> {a} - {b}"
> + )
> + self.log.info(text)
> + eps = 0.01
> + if a < stop_time - eps or b > resume_time + eps:
> + self.fail(text)
> +
> + def one_ping_from_guest(self, vm) -> None:
> + exec_command_and_wait_for_pattern(
> + self,
> + f"ping -c 1 -W 1 {HOST_IP}",
> + "1 packets transmitted, 1 packets received",
> + "1 packets transmitted, 0 packets received",
> + vm=vm,
> + )
> + self.wait_for_console_pattern("# ", vm=vm)
> +
> + def one_ping_from_host(self) -> None:
> + run(f"ping -c 1 -W 1 {GUEST_IP}")
> +
> + def setup_shared_memory(self):
> + shm_path = f"/dev/shm/qemu_test_{os.getpid()}"
> +
> + try:
> + with open(shm_path, "wb") as f:
> + f.write(b"\0" * (1024 * 1024 * 1024)) # 1GB
> + except Exception as e:
> + self.fail(f"Failed to create shared memory file: {e}")
> +
> + return shm_path
> +
> + def prepare_and_launch_vm(self, shm_path, vhost, incoming=False,
> vm=None):
> + if not vm:
> + vm = self.vm
> +
> + vm.set_console()
> + vm.add_args("-accel", "kvm")
> + vm.add_args("-device", "pcie-pci-bridge,id=pci.1,bus=pcie.0")
> + vm.add_args("-m", "1G")
> +
> + vm.add_args(
> + "-object",
> +
> f"memory-backend-file,id=ram0,size=1G,mem-path={shm_path},share=on",
> + )
> + vm.add_args("-machine", "memory-backend=ram0")
> +
> + vm.add_args(
> + "-drive",
> f"file={self.ASSET_ALPINE_ISO.fetch()},media=cdrom,format=raw"
> + )
> +
> + vm.add_args("-S")
> +
> + if incoming:
> + vm.add_args("-incoming", "defer")
> +
> + vm_s = "target" if incoming else "source"
> + self.log.info(f"Launching {vm_s} VM")
> + vm.launch()
> +
> + self.set_migration_capabilities(vm)
> + self.add_virtio_net(vm, vhost, incoming)
> +
> + def add_virtio_net(self, vm, vhost: bool, incoming: bool = False):
> + netdev_params = {
> + "id": "netdev.1",
> + "vhost": vhost,
> + "type": "tap",
> + "ifname": "tap0",
> + "downscript": "no",
> + "queues": 4,
> + }
> +
> + if incoming:
> + netdev_params["local-incoming"] = True
> + else:
> + netdev_params["script"] = "no"
> +
> + vm.cmd("netdev_add", netdev_params)
> +
> + vm.cmd(
> + "device_add",
> + driver="virtio-net-pci",
> + romfile="",
> + id="vnet.1",
> + netdev="netdev.1",
> + mq=True,
> + vectors=18,
> + bus="pci.1",
> + mac=GUEST_MAC,
> + disable_legacy="off",
> + )
> +
> + def set_migration_capabilities(self, vm):
> + capabilities = [
> + {"capability": "events", "state": True},
> + {"capability": "x-ignore-shared", "state": True},
> + {"capability": "local-tap", "state": True},
> + ]
> + vm.cmd("migrate-set-capabilities", {"capabilities": capabilities})
> +
> + def setup_guest_network(self) -> None:
> + exec_command_and_wait_for_pattern(self, "ip addr", "# ")
> + exec_command_and_wait_for_pattern(
> + self,
> + f"ip addr add {GUEST_IP_MASK} dev eth0 && ip link set eth0 up &&
> echo OK",
> + "OK",
> + )
> + self.wait_for_console_pattern("# ")
> +
> + def do_test_tap_fd_migration(self, vhost):
> + self.require_accelerator("kvm")
> + self.set_machine("q35")
> +
> + socket_dir = self.socket_dir()
> + migration_socket = os.path.join(socket_dir.name, "migration.sock")
> +
> + shm_path = self.setup_shared_memory()
> +
> + self.prepare_and_launch_vm(shm_path, vhost)
> + self.vm.cmd("cont")
> + self.wait_for_console_pattern("login:")
> + exec_command_and_wait_for_pattern(self, "root", "# ")
> +
> + self.setup_guest_network()
> +
> + self.one_ping_from_guest(self.vm)
> + self.one_ping_from_host()
> + self.start_outer_ping()
> +
> + # Get some successful pings before migration
> + time.sleep(0.5)
> +
> + target_vm = self.get_vm(name="target")
> + self.prepare_and_launch_vm(shm_path, vhost, incoming=True,
> vm=target_vm)
> +
> + target_vm.cmd("migrate-incoming", {"uri":
> f"unix:{migration_socket}"})
> +
> + self.log.info("Starting migration")
> + freeze_start = time.time()
> + self.vm.cmd("migrate", {"uri": f"unix:{migration_socket}"})
> +
> + self.log.info("Waiting for migration completion")
> + wait_migration_finish(self.vm, target_vm)
> +
> + target_vm.cmd("cont")
> + freeze_end = time.time()
> +
> + self.vm.shutdown()
> +
> + self.log.info("Verifying PING on target VM after migration")
> + self.one_ping_from_guest(target_vm)
> + self.one_ping_from_host()
> +
> + # And a bit more pings after source shutdown
> + time.sleep(0.3)
> + self.stop_ping_and_check(freeze_start, freeze_end)
> +
> + target_vm.shutdown()
> +
> + def test_tap_fd_migration(self):
> + self.do_test_tap_fd_migration(False)
> +
> + def test_tap_fd_migration_vhost(self):
> + self.do_test_tap_fd_migration(True)
> +
> +
> +if __name__ == "__main__":
> + LinuxKernelTest.main()
> --
> 2.48.1
>
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|