3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
8 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
10 def do_expand(val, path):
15 iters = subsitution.finditer(val)
16 for _, m in enumerate(iters, start=1):
20 parts.append(val[prev_pos:start_pos])
22 key = val[start_pos + 1:end_pos]
23 if key not in g_lookup:
25 f"reference key {key} (config: {path}) is not defined")
27 parts.append(g_lookup[key])
30 parts.append(val[prev_pos:])
33 def get_config(opt, path, default=None, required=False):
34 _path = PurePosixPath(path)
40 raise Exception(f"config: {path} is required")
43 if not isinstance(opt, str):
46 return do_expand(opt, path)
48 def join_attrs(attrs):
49 return ",".join(attrs)
51 def parse_protocol(opt):
52 protocol = get_config(opt, "protocol", "telnet")
53 addr = get_config(opt, "addr", ":12345")
54 logfile = get_config(opt, "logfile")
56 return (f"{protocol}:{addr}", logfile)
58 class QEMUPeripherals:
59 def __init__(self, name, opt) -> None:
63 def get_qemu_opts(self) -> list:
66 class BasicSerialDevice(QEMUPeripherals):
67 def __init__(self, opt) -> None:
68 super().__init__("serial", opt)
70 def get_qemu_opts(self):
71 link, logfile = parse_protocol(self._opt)
73 cmds = [ link, "server", "nowait" ]
75 cmds.append(f"logfile={logfile}")
76 return [ "-serial", join_attrs(cmds) ]
78 class PCISerialDevice(QEMUPeripherals):
79 def __init__(self, opt) -> None:
80 super().__init__("pci-serial", opt)
82 def get_qemu_opts(self):
83 name = f"chrdev.{hex(self.__hash__())[2:]}"
84 cmds = [ "pci-serial", f"chardev={name}" ]
85 chrdev = [ "file", f"id={name}" ]
87 logfile = get_config(self._opt, "logfile", required=True)
88 chrdev.append(f"path={logfile}")
91 "-chardev", join_attrs(chrdev),
92 "-device", join_attrs(cmds)
95 class AHCIBus(QEMUPeripherals):
96 def __init__(self, opt) -> None:
97 super().__init__("ahci", opt)
99 def get_qemu_opts(self):
101 name: str = get_config(opt, "name", required=True)
102 name = name.strip().replace(" ", "_")
103 cmds = [ "-device", f"ahci,id={name}" ]
105 for i, disk in enumerate(get_config(opt, "disks", default=[])):
106 d_type = get_config(disk, "type", default="ide-hd")
107 d_img = get_config(disk, "img", required=True)
108 d_ro = get_config(disk, "ro", default=False)
109 d_fmt = get_config(disk, "format", default="raw")
113 "-drive", join_attrs([
116 f"readonly={'on' if d_ro else 'off'}",
120 "-device", join_attrs([
129 class RTCDevice(QEMUPeripherals):
130 def __init__(self, opt) -> None:
131 super().__init__("rtc", opt)
133 def get_qemu_opts(self):
135 base = get_config(opt, "base", default="utc")
136 return [ "-rtc", f"base={base}" ]
138 class QEMUMonitor(QEMUPeripherals):
139 def __init__(self, opt) -> None:
140 super().__init__("monitor", opt)
142 def get_qemu_opts(self):
143 link, logfile = parse_protocol(self._opt)
146 "-monitor", join_attrs([
156 "basic_serial": BasicSerialDevice,
160 "pci-serial": PCISerialDevice
163 def __init__(self, options) -> None:
167 for dev in get_config(options, "devices", default=[]):
168 dev_class = get_config(dev, "class")
169 if dev_class not in QEMUExec.devices:
170 raise Exception(f"device class: {dev_class} is not defined")
172 self._devices.append(QEMUExec.devices[dev_class](dev))
174 def get_qemu_exec_name(self):
177 def get_qemu_arch_opts(self):
179 "-machine", get_config(self._opt, "machine"),
181 get_config(self._opt, "cpu/type", required=True),
182 *get_config(self._opt, "cpu/features", default=[]),
188 def get_qemu_debug_opts(self):
189 cmds = [ "-no-reboot", "-no-shutdown" ]
190 debug = get_config(self._opt, "debug")
195 cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
197 trace_opts = get_config(debug, "traced", [])
198 for trace in trace_opts:
199 cmds += [ "-d", f"trace:{trace}"]
203 def get_qemu_general_opts(self):
205 "-m", get_config(self._opt, "memory", required=True),
206 "-smp", get_config(self._opt, "ncpu", default=1)
209 def add_peripheral(self, peripheral):
210 self._devices.append(peripheral)
212 def start(self, qemu_dir_override=""):
213 qemu_path = self.get_qemu_exec_name()
214 qemu_path = os.path.join(qemu_dir_override, qemu_path)
217 *self.get_qemu_arch_opts(),
218 *self.get_qemu_debug_opts()
221 for dev in self._devices:
222 cmds += dev.get_qemu_opts()
224 print(" ".join(cmds), "\n")
226 handle = subprocess.Popen(cmds)
229 ret_code = handle.poll()
230 if ret_code is not None:
234 class QEMUx86Exec(QEMUExec):
235 def __init__(self, options) -> None:
236 super().__init__(options)
238 def get_qemu_exec_name(self):
239 if get_config(self._opt, "arch") in ["i386", "x86_32"]:
240 return "qemu-system-i386"
242 return "qemu-system-x86_64"
247 arg = argparse.ArgumentParser()
249 arg.add_argument("config_file")
250 arg.add_argument("--qemu-dir", default="")
251 arg.add_argument("-v", "--values", action='append', default=[])
253 arg_opt = arg.parse_args()
256 with open(arg_opt.config_file, 'r') as f:
257 opts.update(json.loads(f.read()))
259 for kv in arg_opt.values:
260 [k, v] = kv.split('=')
263 arch = get_config(opts, "arch")
266 if arch in ["i386", "x86_32", "x86_64"]:
267 q = QEMUx86Exec(opts)
269 raise Exception(f"undefined arch: {arch}")
271 q.start(arg_opt.qemu_dir)
273 if __name__ == "__main__":
276 except Exception as e: