#!/usr/bin/env python3 import argparse import atexit import binascii import http.server import os import platform import select import shutil import signal import socket import socketserver import subprocess import sys import tempfile import time is_verbose = False def print_verbose(s): if is_verbose: print(s) def print_error(s): print(s, file=sys.stderr) def exit_error(s): print_error(s) sys.exit(1) def bool_arg(val): return "on" if val else "off" def find_qemu(arch): binary_names = [ f"qemu-system-{arch}" ] if arch == platform.machine(): binary_names.append("qemu-kvm") for binary_name in binary_names: if "QEMU_BUILD_DIR" in os.environ: p = os.path.join(os.environ["QEMU_BUILD_DIR"], binary_name) if os.path.isfile(p): return p else: exit_error(f"Can't find {binary_name}") qemu_bin_dirs = ["/usr/bin", "/usr/libexec"] if "PATH" in os.environ: qemu_bin_dirs += os.environ["PATH"].split(":") for d in qemu_bin_dirs: p = os.path.join(d, binary_name) if os.path.isfile(p): return p exit_error(f"Can't find {binary_name}") def qemu_available_accels(qemu): cmd = qemu + ' -accel help' info = subprocess.check_output(cmd.split(" ")).decode('utf-8') accel_list = [] for accel in ('kvm', 'xen', 'hvf', 'hax', 'tcg'): if info.find(accel) > 0: accel_list.append(accel) return accel_list def random_id(): return binascii.b2a_hex(os.urandom(8)).decode('utf8') def machine_id(): try: with open("/etc/machine-id", "r") as f: mid = f.read().strip() except FileNotFoundError: if sys.platform == "darwin": # for macOS import plistlib cmd = "ioreg -rd1 -c IOPlatformExpertDevice -a" plist_data = subprocess.check_output(cmd.split(" ")) mid = plistlib.loads(plist_data)[0]["IOPlatformUUID"].replace("-","") else: # fallback for the other distros hostname = socket.gethostname() mid = ''.join(hex(ord(x))[2:] for x in (hostname*16)[:16]) return mid def generate_mac_address(): # create a new mac address based on our machine id data = machine_id() maclst = ["FE"] + [data[x:x+2] for x in range(-12, -2, 2)] return ":".join(maclst) def run_http_server(path): writer, reader = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) child_pid = os.fork() if child_pid == 0: reader.close() # Child os.chdir(path) class HTTPHandler(http.server.SimpleHTTPRequestHandler): def log_message(self, format, *args): pass # Silence logs httpd = socketserver.TCPServer(("127.0.0.1", 0), HTTPHandler) writer.send(str(httpd.server_address[1]).encode("utf8")) writer.close() httpd.serve_forever() sys.exit(0) # Parent writer.close() atexit.register(os.kill, child_pid,signal.SIGTERM) http_port = int(reader.recv(128).decode("utf8")) reader.close() return http_port def find_ovmf(args): dirs = [ "~/.local/share/ovmf", "/usr/share/OVMF", "/usr/share/edk2/ovmf/" ] if args.ovmf_dir: dirs.append(args.ovmf_dir) for d in dirs: path = os.path.expanduser(d) if os.path.exists(path): return path raise RuntimeError("Could not find OMVF") # location can differ depending on how qemu is installed def find_edk2(): dirs = [ "/usr/local/share/qemu", "/opt/homebrew/share/qemu" ] for d in dirs: path = os.path.expanduser(d) if os.path.exists(path): return path raise RuntimeError("Could not find edk2 directory") def qemu_run_command(qmp_socket_path, command): sock2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock2.connect(qmp_socket_path) r = sock2.recv(1024) sock2.send('{"execute":"qmp_capabilities"}\n'.encode("utf8")) r = sock2.recv(1024) sock2.send(f'{command}\n'.encode("utf8")) r = sock2.recv(1024) sock2.close() def virtio_serial_connect(virtio_socket_path): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) while True: time.sleep(0.1) try: sock.connect(virtio_socket_path) return sock except FileNotFoundError: pass def available_tcp_port(port_range_from = 1024): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port = port_range_from port_range_to = port_range_from + 32 # limit for retry while port < port_range_to: try: s.bind(('', port)) except OSError: port += 1 continue break s.close() return port class WatchdogCommand: START = 1 STOP = 2 def __init__(self, op, arg = None): self.op = op self.arg = arg def parse_watchdog_commands(sock): commands = [] data = sock.recv(16).decode("utf8") for l in data.splitlines(): if l.startswith("START"): try: arg = int(l[5:]) except ValueError: arg = 30 # Default if not specified commands.append( WatchdogCommand(WatchdogCommand.START, arg) ) elif l.startswith("STOP"): commands.append( WatchdogCommand(WatchdogCommand.STOP) ) else: print_verbose(f"Unsupported watchdog command {l}") return commands def run_watchdog(watch_socket_path, qmp_socket_path): sock = virtio_serial_connect(watch_socket_path) p = select.poll() p.register(sock, select.POLLIN) watchdog_timeout = None watchdog_delay = 30 while True: timeout = None if watchdog_timeout != None: timeout = max(watchdog_timeout - time.time(), 0) * 1000 poll_res = p.poll(timeout) if len(poll_res) > 0: v = poll_res[0][1] if v & select.POLLHUP: sys.exit(0) commands = parse_watchdog_commands(sock) for cmd in commands: if cmd.op == WatchdogCommand.START: print_verbose(f"Starting watchdog for {cmd.arg} sec") watchdog_timeout = time.time() + cmd.arg if cmd.op == WatchdogCommand.STOP: print_verbose(f"Stopped watchdog") watchdog_timeout = None if watchdog_timeout != None and time.time() >= watchdog_timeout: print_verbose(f"Triggering watchdog") qemu_run_command(qmp_socket_path, '{"execute": "system_reset"}') # Queue a new timeout in case the next boot fails, until disabled watchdog_timeout = time.time() + watchdog_delay def main(): parser = argparse.ArgumentParser(description="Boot virtual machine images") parser.add_argument("--verbose", default=False, action="store_true") parser.add_argument("--arch", default=platform.machine(), action="store", help=f"Arch to run for (default {platform.machine()})") parser.add_argument("--publish-dir", action="store", help=f"Publish the specified directory over http in the vm") parser.add_argument("--memory", default="2G", help=f"Memory size (default 2G)") parser.add_argument("--nographics", default=False, action="store_true", help=f"Run without graphics") parser.add_argument("--watchdog", default=False, action="store_true", help=f"Enable watchdog") parser.add_argument("--tpm2", default=False, action="store_true", help=f"Enable TPM2") parser.add_argument("--snapshot", default=False, action="store_true", help=f"Work on a snapshot of the image") parser.add_argument("--ovmf-dir", action="store", help="Specify directory for OVMF files (Open Virtual Machine Firmware)") parser.add_argument("--secureboot", dest="secureboot", action="store_true", default=False, help="Enable SecureBoot") parser.add_argument("--ssh-port", type=int, default=2222, help="SSH port forwarding to SSH_PORT (default 2222)") parser.add_argument("--cdrom", action="store", help="Specify .iso to load") parser.add_argument("image", type=str, help="The image to boot") parser.add_argument('extra_args', nargs=argparse.REMAINDER, metavar="...", help="extra qemu arguments") args = parser.parse_args(sys.argv[1:]) global is_verbose is_verbose = args.verbose # arm64 is an alias for aarch64 on macOS if args.arch == "arm64": args.arch = "aarch64" qemu = find_qemu(args.arch) accel_list = qemu_available_accels(qemu) qemu_args = [qemu] if args.arch == "x86_64": machine = "q35" default_cpu = "qemu64,+ssse3,+sse4.1,+sse4.2,+popcnt" ovmf = find_ovmf(args) if args.secureboot: qemu_args += [ "-drive", f"file={ovmf}/OVMF_CODE.secboot.fd,if=pflash,format=raw,unit=0,readonly=on", "-drive", f"file={ovmf}/OVMF_VARS.secboot.fd,if=pflash,format=raw,unit=1,snapshot=on,readonly=off", ] else: qemu_args += [ "-drive", f"file={ovmf}/OVMF_CODE.fd,if=pflash,format=raw,unit=0,readonly=on", "-drive", f"file={ovmf}/OVMF_VARS.fd,if=pflash,format=raw,unit=1,snapshot=on,readonly=off", ] elif args.arch == "aarch64": machine = "virt" default_cpu = "cortex-a57" if sys.platform == "darwin": edk2 = find_edk2() qemu_args += [ "-device", "virtio-gpu-pci", # for display "-display", "default,show-cursor=on", # for display "-device", "qemu-xhci", # for keyboard "-device", "usb-kbd", # for keyboard "-device", "usb-tablet", # for mouse "-smp", str(os.cpu_count()), # for max cores "-drive", f"file={edk2}/edk2-aarch64-code.fd,if=pflash,format=raw,unit=0,readonly=on", "-drive", f"file={edk2}/edk2-arm-vars.fd,if=pflash,format=raw,unit=1,snapshot=on,readonly=off" ] else: qemu_args += [ "-bios", "/usr/share/edk2/aarch64/QEMU_EFI.fd", "-boot", "efi" ] else: exit_error(f"unsupported architecture {args.arch}") accel_enabled = True if 'kvm' in accel_list and os.path.exists("/dev/kvm"): qemu_args += ['-enable-kvm'] elif 'hvf' in accel_list: qemu_args += ['-accel', 'hvf'] else: accel_enabled = False print_verbose("Acceleration: off") qemu_args += [ "-m", str(args.memory), "-machine", machine, "-cpu", "host" if accel_enabled else default_cpu ] guestfwds="" if args.publish_dir: if shutil.which("netcat") is None: print("Command `netcat` not found in path, ignoring publish-dir") else: httpd_port = run_http_server(args.publish_dir) guestfwds = f"guestfwd=tcp:10.0.2.100:80-cmd:netcat 127.0.0.1 {httpd_port}," print_verbose(f"publishing {args.publish_dir} on http://10.0.2.100/") portfwd = { available_tcp_port(args.ssh_port): 22 } for local, remote in portfwd.items(): print_verbose(f"port: {local} → {remote}") fwds = [f"hostfwd=tcp::{h}-:{g}" for h, g in portfwd.items()] macstr = generate_mac_address() print_verbose(f"MAC: {macstr}") qemu_args += [ "-device", f"virtio-net-pci,netdev=n0,mac={macstr}", "-netdev", "user,id=n0,net=10.0.2.0/24," + guestfwds + ",".join(fwds), ] if args.nographics: qemu_args += ["-nographic"] runvm_id = random_id() tmpdir = tempfile.TemporaryDirectory(prefix=f"runvm-{runvm_id}") watchdog_pid = 0 if args.watchdog: qmp_socket_path = os.path.join(tmpdir.name, "qmp-socket") watch_socket_path = os.path.join(tmpdir.name, "watch-socket") qemu_args += [ "-qmp", f"unix:{qmp_socket_path},server=on,wait=off", "-device", "virtio-serial", "-chardev", f"socket,path={watch_socket_path},server=on,wait=off,id=watchdog", "-device", "virtserialport,chardev=watchdog,name=watchdog.0" ] watchdog_pid = os.fork() if watchdog_pid == 0: run_watchdog(watch_socket_path, qmp_socket_path) sys.exit(0) if args.tpm2: if shutil.which("swtpm") is None: exit_error("Command `swtpm` not found in path, this is needed for tpm2 support") tpm2_socket = os.path.join(tmpdir.name, "tpm-socket") if args.snapshot: tpm2_path = os.path.join(tmpdir.name, "tpm2_state") else: tpm2_path = ".tpm2_state" os.makedirs(tpm2_path, exist_ok=True) swtpm_args = ["swtpm", "socket", "--tpm2", "--tpmstate", f"dir={tpm2_path}", "--ctrl", f"type=unixio,path={tpm2_socket}" ] res = subprocess.Popen(swtpm_args) qemu_args += [ "-chardev", f"socket,id=chrtpm,path={tpm2_socket}", "-tpmdev", "emulator,id=tpm0,chardev=chrtpm", "-device", "tpm-tis,tpmdev=tpm0" ] print_verbose(f"Image: {args.image}") if args.image.endswith(".raw"): qemu_args += [ "-drive", f"file={args.image},index=0,media=disk,format=raw,if=virtio,snapshot={bool_arg(args.snapshot)}", ] else: # assume qcow2 qemu_args += [ "-drive", f"file={args.image},index=0,media=disk,format=qcow2,if=virtio,snapshot={bool_arg(args.snapshot)}", ] if args.cdrom: qemu_args += [ "-cdrom", args.cdrom, "-boot", "d" ] qemu_args += args.extra_args print_verbose(f"Running: {' '.join(qemu_args)}") try: res = subprocess.run(qemu_args, check=False) except KeyboardInterrupt: exit_error("Aborted") if watchdog_pid: os.kill(watchdog_pid, signal.SIGTERM) tmpdir.cleanup() return res.returncode if __name__ == "__main__": sys.exit(main())