3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
8 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
10 def do_expand(val, path):
15 iters = subsitution.finditer(val)
16 for _, m in enumerate(iters, start=1):
20 parts.append(val[prev_pos:start_pos])
22 key = val[start_pos + 1:end_pos]
23 if key not in g_lookup:
25 f"reference key {key} (config: {path}) is not defined")
27 parts.append(g_lookup[key])
30 parts.append(val[prev_pos:])
33 def get_config(opt, path, default=None, required=False):
34 _path = PurePosixPath(path)
40 raise Exception(f"config: {path} is required")
43 if not isinstance(opt, str):
46 return do_expand(opt, path)
48 def join_attrs(attrs):
49 return ",".join(attrs)
51 def parse_protocol(opt):
52 protocol = get_config(opt, "protocol", "telnet")
53 addr = get_config(opt, "addr", ":12345")
54 logfile = get_config(opt, "logfile")
56 return (f"{protocol}:{addr}", logfile)
58 class QEMUPeripherals:
59 def __init__(self, name, opt) -> None:
63 def get_qemu_opts(self) -> list:
66 class BasicSerialDevice(QEMUPeripherals):
67 def __init__(self, opt) -> None:
68 super().__init__("serial", opt)
70 def get_qemu_opts(self):
71 link, logfile = parse_protocol(self._opt)
73 cmds = [ link, "server", "nowait" ]
75 cmds.append(f"logfile={logfile}")
76 return [ "-serial", join_attrs(cmds) ]
78 class PCISerialDevice(QEMUPeripherals):
79 def __init__(self, opt) -> None:
80 super().__init__("pci-serial", opt)
82 def get_qemu_opts(self):
83 uniq = hex(self.__hash__())[2:]
84 name = f"chrdev.{uniq}"
85 cmds = [ "pci-serial", f"id=uart.{uniq}", f"chardev={name}" ]
86 chrdev = [ "file", f"id={name}" ]
88 logfile = get_config(self._opt, "logfile", required=True)
89 chrdev.append(f"path={logfile}")
92 "-chardev", join_attrs(chrdev),
93 "-device", join_attrs(cmds)
96 class AHCIBus(QEMUPeripherals):
97 def __init__(self, opt) -> None:
98 super().__init__("ahci", opt)
100 def get_qemu_opts(self):
102 name: str = get_config(opt, "name", required=True)
103 name = name.strip().replace(" ", "_")
104 cmds = [ "-device", f"ahci,id={name}" ]
106 for i, disk in enumerate(get_config(opt, "disks", default=[])):
107 d_type = get_config(disk, "type", default="ide-hd")
108 d_img = get_config(disk, "img", required=True)
109 d_ro = get_config(disk, "ro", default=False)
110 d_fmt = get_config(disk, "format", default="raw")
114 "-drive", join_attrs([
117 f"readonly={'on' if d_ro else 'off'}",
121 "-device", join_attrs([
130 class RTCDevice(QEMUPeripherals):
131 def __init__(self, opt) -> None:
132 super().__init__("rtc", opt)
134 def get_qemu_opts(self):
136 base = get_config(opt, "base", default="utc")
137 return [ "-rtc", f"base={base}" ]
139 class QEMUMonitor(QEMUPeripherals):
140 def __init__(self, opt) -> None:
141 super().__init__("monitor", opt)
143 def get_qemu_opts(self):
144 link, logfile = parse_protocol(self._opt)
147 "-monitor", join_attrs([
157 "basic_serial": BasicSerialDevice,
161 "pci-serial": PCISerialDevice
164 def __init__(self, options) -> None:
168 for dev in get_config(options, "devices", default=[]):
169 dev_class = get_config(dev, "class")
170 if dev_class not in QEMUExec.devices:
171 raise Exception(f"device class: {dev_class} is not defined")
173 self._devices.append(QEMUExec.devices[dev_class](dev))
175 def get_qemu_exec_name(self):
178 def get_qemu_arch_opts(self):
180 "-machine", get_config(self._opt, "machine"),
182 get_config(self._opt, "cpu/type", required=True),
183 *get_config(self._opt, "cpu/features", default=[]),
189 def get_qemu_debug_opts(self):
190 cmds = [ "-no-reboot", "-no-shutdown" ]
191 debug = get_config(self._opt, "debug")
196 cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
198 trace_opts = get_config(debug, "traced", [])
199 for trace in trace_opts:
200 cmds += [ "--trace", f"{trace}"]
204 def get_qemu_general_opts(self):
206 "-m", get_config(self._opt, "memory", required=True),
207 "-smp", get_config(self._opt, "ncpu", default=1)
210 def add_peripheral(self, peripheral):
211 self._devices.append(peripheral)
213 def start(self, qemu_dir_override=""):
214 qemu_path = self.get_qemu_exec_name()
215 qemu_path = os.path.join(qemu_dir_override, qemu_path)
218 *self.get_qemu_arch_opts(),
219 *self.get_qemu_debug_opts()
222 for dev in self._devices:
223 cmds += dev.get_qemu_opts()
225 print(" ".join(cmds), "\n")
227 handle = subprocess.Popen(cmds)
230 ret_code = handle.poll()
231 if ret_code is not None:
235 class QEMUx86Exec(QEMUExec):
236 def __init__(self, options) -> None:
237 super().__init__(options)
239 def get_qemu_exec_name(self):
240 if get_config(self._opt, "arch") in ["i386", "x86_32"]:
241 return "qemu-system-i386"
243 return "qemu-system-x86_64"
248 arg = argparse.ArgumentParser()
250 arg.add_argument("config_file")
251 arg.add_argument("--qemu-dir", default="")
252 arg.add_argument("-v", "--values", action='append', default=[])
254 arg_opt = arg.parse_args()
257 with open(arg_opt.config_file, 'r') as f:
258 opts.update(json.loads(f.read()))
260 for kv in arg_opt.values:
261 [k, v] = kv.split('=')
264 arch = get_config(opts, "arch")
267 if arch in ["i386", "x86_32", "x86_64"]:
268 q = QEMUx86Exec(opts)
270 raise Exception(f"undefined arch: {arch}")
272 q.start(arg_opt.qemu_dir)
274 if __name__ == "__main__":
277 except Exception as e: