X-Git-Url: https://scm.lunaixsky.com/lunaix-os.git/blobdiff_plain/ebb55b7e5f0b8f31328950ec383b77b208ffbb64..2d0cfde0212549bb2745688fadbb9c37652d3ab2:/lunaix-os/scripts/qemu.py diff --git a/lunaix-os/scripts/qemu.py b/lunaix-os/scripts/qemu.py index ce843da..edd5cdc 100755 --- a/lunaix-os/scripts/qemu.py +++ b/lunaix-os/scripts/qemu.py @@ -2,6 +2,11 @@ import subprocess, time, os, re, argparse, json from pathlib import PurePosixPath +import logging +import uuid + +logger = logging.getLogger("auto_qemu") +logging.basicConfig(level=logging.INFO) g_lookup = {} @@ -48,12 +53,89 @@ def get_config(opt, path, default=None, required=False): def join_attrs(attrs): return ",".join(attrs) -def parse_protocol(opt): - protocol = get_config(opt, "protocol", "telnet") - addr = get_config(opt, "addr", ":12345") - logfile = get_config(opt, "logfile") +def get_uniq(): + return uuid.uuid4().hex[:8] + +def map_bool(b): + return "on" if b else "off" + + + +################################# +# IO Backend Definitions +# + +class IOBackend: + def __init__(self, opt, id_prefix="io") -> None: + self._type = get_config(opt, "type", required=True) + self._logfile = get_config(opt, "logfile") + self._id = f"{id_prefix}.{get_uniq()}" + + def get_options(self): + opts = [] + if self._logfile: + opts.append(f"logfile={self._logfile}") + return opts + + def to_cmdline(self): + return join_attrs([ + self._type, f"id={self._id}", *self.get_options() + ]) + + def name(self): + return self._id + +class FileIOBackend(IOBackend): + def __init__(self, opt) -> None: + super().__init__(opt) + self.__path = get_config(opt, "path", required=True) + + def get_options(self): + opts = [ + f"path={self.__path}" + ] + return opts + super().get_options() + +class SocketIOBackend(IOBackend): + def __init__(self, opt) -> None: + super().__init__(opt) + self.__protocol = self._type + self._type = "socket" + self.__host = get_config(opt, "host", default="localhost") + self.__port = get_config(opt, "port", required=True) + self.__server = bool(get_config(opt, "server", True)) + self.__wait = bool(get_config(opt, "wait", True)) + + def get_options(self): + opts = [ + f"host={self.__host}", + f"port={self.__port}", + f"server={map_bool(self.__server)}", + f"wait={map_bool(self.__wait)}", + ] + if self.__protocol == "telnet": + opts.append("telnet=on") + if self.__protocol == "ws": + opts.append("websocket=on") + return opts + super().get_options() + +def select_backend(opt): + bopt = get_config(opt, "io", required=True) + backend_type = get_config(bopt, "type", required=True) + + if backend_type in ["telnet", "ws", "tcp"]: + return SocketIOBackend(bopt) + + if backend_type in ["file", "pipe", "serial", "parallel"]: + return FileIOBackend(bopt) + + return IOBackend(bopt) - return (f"{protocol}:{addr}", logfile) + + +################################# +# QEMU Emulated Device Definitions +# class QEMUPeripherals: def __init__(self, name, opt) -> None: @@ -63,39 +145,71 @@ class QEMUPeripherals: def get_qemu_opts(self) -> list: pass -class BasicSerialDevice(QEMUPeripherals): +class ISASerialDevice(QEMUPeripherals): def __init__(self, opt) -> None: - super().__init__("serial", opt) + super().__init__("isa-serial", opt) def get_qemu_opts(self): - link, logfile = parse_protocol(self._opt) + chardev = select_backend(self._opt) + + cmds = [ + "isa-serial", + f"id=com.{get_uniq()}", + f"chardev={chardev.name()}" + ] - cmds = [ link, "server", "nowait" ] - if logfile: - cmds.append(f"logfile={logfile}") - return [ "-serial", join_attrs(cmds) ] + return [ + "-chardev", chardev.to_cmdline(), + "-device", join_attrs(cmds) + ] class PCISerialDevice(QEMUPeripherals): def __init__(self, opt) -> None: super().__init__("pci-serial", opt) def get_qemu_opts(self): - uniq = hex(self.__hash__())[2:] - name = f"chrdev.{uniq}" - cmds = [ "pci-serial", f"id=uart.{uniq}", f"chardev={name}" ] - chrdev = [ "file", f"id={name}" ] - - logfile = get_config(self._opt, "logfile", required=True) - chrdev.append(f"path={logfile}") + chardev = select_backend(self._opt) + + cmds = [ + "pci-serial", + f"id=uart.{get_uniq()}", + f"chardev={chardev.name()}" + ] return [ - "-chardev", join_attrs(chrdev), + "-chardev", chardev.to_cmdline(), "-device", join_attrs(cmds) ] class AHCIBus(QEMUPeripherals): def __init__(self, opt) -> None: super().__init__("ahci", opt) + + def __create_disklet(self, index, bus, opt): + d_type = get_config(opt, "type", default="ide-hd") + d_img = get_config(opt, "img", required=True) + d_ro = get_config(opt, "ro", default=False) + d_fmt = get_config(opt, "format", default="raw") + d_id = f"disk_{index}" + + if not os.path.exists(d_img): + logger.warning(f"AHCI bus: {d_img} not exists, skipped") + return [] + + return [ + "-drive", join_attrs([ + f"id={d_id}", + f"file={d_img}", + f"readonly={'on' if d_ro else 'off'}", + f"if=none", + f"format={d_fmt}" + ]), + "-device", join_attrs([ + d_type, + f"drive={d_id}", + f"bus={bus}.{index}" + ]) + ] def get_qemu_opts(self): opt = self._opt @@ -103,27 +217,9 @@ class AHCIBus(QEMUPeripherals): name = name.strip().replace(" ", "_") cmds = [ "-device", f"ahci,id={name}" ] - for i, disk in enumerate(get_config(opt, "disks", default=[])): - d_type = get_config(disk, "type", default="ide-hd") - d_img = get_config(disk, "img", required=True) - d_ro = get_config(disk, "ro", default=False) - d_fmt = get_config(disk, "format", default="raw") - d_id = f"disk_{i}" - - cmds += [ - "-drive", join_attrs([ - f"id={d_id}," - f"file={d_img}", - f"readonly={'on' if d_ro else 'off'}", - f"if=none", - f"format={d_fmt}" - ]), - "-device", join_attrs([ - d_type, - f"drive={d_id}", - f"bus={name}.{i}" - ]) - ] + disklets = get_config(opt, "disks", default=[]) + for i, disk in enumerate(disklets): + cmds += self.__create_disklet(i, name, disk) return cmds @@ -141,36 +237,49 @@ class QEMUMonitor(QEMUPeripherals): super().__init__("monitor", opt) def get_qemu_opts(self): - link, logfile = parse_protocol(self._opt) + + chardev = select_backend(self._opt) return [ - "-monitor", join_attrs([ - link, - "server", - "nowait", - f"logfile={logfile}" + "-chardev", chardev.to_cmdline(), + "-mon", join_attrs([ + chardev.name(), + "mode=readline", ]) ] -class QEMUExec: - devices = { - "basic_serial": BasicSerialDevice, +class QEMUDevices: + __devs = { + "isa-serial": ISASerialDevice, "ahci": AHCIBus, "rtc": RTCDevice, "hmp": QEMUMonitor, "pci-serial": PCISerialDevice } + @staticmethod + def get(name): + if name not in QEMUDevices.__devs: + raise Exception(f"device class: {name} is not defined") + return QEMUDevices.__devs[name] + + + +################################# +# QEMU Machine Definitions +# + +class QEMUExec: + + def __init__(self, options) -> None: self._opt = options self._devices = [] for dev in get_config(options, "devices", default=[]): dev_class = get_config(dev, "class") - if dev_class not in QEMUExec.devices: - raise Exception(f"device class: {dev_class} is not defined") - - self._devices.append(QEMUExec.devices[dev_class](dev)) + device = QEMUDevices.get(dev_class) + self._devices.append(device(dev)) def get_qemu_exec_name(self): pass @@ -202,19 +311,33 @@ class QEMUExec: return cmds def get_qemu_general_opts(self): - return [ + opts = [ "-m", get_config(self._opt, "memory", required=True), - "-smp", get_config(self._opt, "ncpu", default=1) + "-smp", str(get_config(self._opt, "ncpu", default=1)) ] + kopts = get_config(self._opt, "kernel") + if kopts: + opts += [ + "-kernel", get_config(kopts, "bin", required=True), + "-append", get_config(kopts, "cmd", required=True) + ] + + dtb = get_config(kopts, "dtb") + if dtb: + opts += [ "-dtb", dtb ] + + return opts + def add_peripheral(self, peripheral): self._devices.append(peripheral) - def start(self, qemu_dir_override=""): + def start(self, qemu_dir_override="", dryrun=False, extras=[]): qemu_path = self.get_qemu_exec_name() qemu_path = os.path.join(qemu_dir_override, qemu_path) cmds = [ qemu_path, + *self.get_qemu_general_opts(), *self.get_qemu_arch_opts(), *self.get_qemu_debug_opts() ] @@ -222,9 +345,15 @@ class QEMUExec: for dev in self._devices: cmds += dev.get_qemu_opts() - print(" ".join(cmds), "\n") + cmds += extras + logger.info(" ".join(cmds)) + + if dryrun: + logger.info("[DRY RUN] QEMU not invoked") + return handle = subprocess.Popen(cmds) + logger.info(f"QEMU launched (pid={handle.pid})") while True: ret_code = handle.poll() @@ -249,16 +378,18 @@ def main(): arg.add_argument("config_file") arg.add_argument("--qemu-dir", default="") + arg.add_argument("--dry", action='store_true') arg.add_argument("-v", "--values", action='append', default=[]) - arg_opt = arg.parse_args() + arg_opt, extras = arg.parse_known_args() opts = {} with open(arg_opt.config_file, 'r') as f: opts.update(json.loads(f.read())) for kv in arg_opt.values: - [k, v] = kv.split('=') + splits = kv.split('=') + k, v = splits[0], "=".join(splits[1:]) g_lookup[k] = v arch = get_config(opts, "arch") @@ -269,10 +400,8 @@ def main(): else: raise Exception(f"undefined arch: {arch}") - q.start(arg_opt.qemu_dir) + extras = [ x for x in extras if x != '--'] + q.start(arg_opt.qemu_dir, arg_opt.dry, extras) if __name__ == "__main__": - try: - main() - except Exception as e: - print(e) \ No newline at end of file + main() \ No newline at end of file