3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
7 logger = logging.getLogger("auto_qemu")
11 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
13 def do_expand(val, path):
18 iters = subsitution.finditer(val)
19 for _, m in enumerate(iters, start=1):
23 parts.append(val[prev_pos:start_pos])
25 key = val[start_pos + 1:end_pos]
26 if key not in g_lookup:
28 f"reference key {key} (config: {path}) is not defined")
30 parts.append(g_lookup[key])
33 parts.append(val[prev_pos:])
36 def get_config(opt, path, default=None, required=False):
37 _path = PurePosixPath(path)
43 raise Exception(f"config: {path} is required")
46 if not isinstance(opt, str):
49 return do_expand(opt, path)
51 def join_attrs(attrs):
52 return ",".join(attrs)
54 def parse_protocol(opt):
55 protocol = get_config(opt, "protocol", "telnet")
56 addr = get_config(opt, "addr", ":12345")
57 logfile = get_config(opt, "logfile")
59 return (f"{protocol}:{addr}", logfile)
61 class QEMUPeripherals:
62 def __init__(self, name, opt) -> None:
66 def get_qemu_opts(self) -> list:
69 class BasicSerialDevice(QEMUPeripherals):
70 def __init__(self, opt) -> None:
71 super().__init__("serial", opt)
73 def get_qemu_opts(self):
74 link, logfile = parse_protocol(self._opt)
76 cmds = [ link, "server", "nowait" ]
78 cmds.append(f"logfile={logfile}")
79 return [ "-serial", join_attrs(cmds) ]
81 class PCISerialDevice(QEMUPeripherals):
82 def __init__(self, opt) -> None:
83 super().__init__("pci-serial", opt)
85 def get_qemu_opts(self):
86 uniq = hex(self.__hash__())[2:]
87 name = f"chrdev.{uniq}"
88 cmds = [ "pci-serial", f"id=uart.{uniq}", f"chardev={name}" ]
89 chrdev = [ "file", f"id={name}" ]
91 logfile = get_config(self._opt, "logfile", required=True)
92 chrdev.append(f"path={logfile}")
95 "-chardev", join_attrs(chrdev),
96 "-device", join_attrs(cmds)
99 class AHCIBus(QEMUPeripherals):
100 def __init__(self, opt) -> None:
101 super().__init__("ahci", opt)
103 def get_qemu_opts(self):
105 name: str = get_config(opt, "name", required=True)
106 name = name.strip().replace(" ", "_")
107 cmds = [ "-device", f"ahci,id={name}" ]
109 for i, disk in enumerate(get_config(opt, "disks", default=[])):
110 d_type = get_config(disk, "type", default="ide-hd")
111 d_img = get_config(disk, "img", required=True)
112 d_ro = get_config(disk, "ro", default=False)
113 d_fmt = get_config(disk, "format", default="raw")
116 if not os.path.exists(d_img):
117 logger.warning(f"AHCI bus: {d_img} not exists, disk skipped")
121 "-drive", join_attrs([
124 f"readonly={'on' if d_ro else 'off'}",
128 "-device", join_attrs([
137 class RTCDevice(QEMUPeripherals):
138 def __init__(self, opt) -> None:
139 super().__init__("rtc", opt)
141 def get_qemu_opts(self):
143 base = get_config(opt, "base", default="utc")
144 return [ "-rtc", f"base={base}" ]
146 class QEMUMonitor(QEMUPeripherals):
147 def __init__(self, opt) -> None:
148 super().__init__("monitor", opt)
150 def get_qemu_opts(self):
151 link, logfile = parse_protocol(self._opt)
154 "-monitor", join_attrs([
164 "basic_serial": BasicSerialDevice,
168 "pci-serial": PCISerialDevice
171 def __init__(self, options) -> None:
175 for dev in get_config(options, "devices", default=[]):
176 dev_class = get_config(dev, "class")
177 if dev_class not in QEMUExec.devices:
178 raise Exception(f"device class: {dev_class} is not defined")
180 self._devices.append(QEMUExec.devices[dev_class](dev))
182 def get_qemu_exec_name(self):
185 def get_qemu_arch_opts(self):
187 "-machine", get_config(self._opt, "machine"),
189 get_config(self._opt, "cpu/type", required=True),
190 *get_config(self._opt, "cpu/features", default=[]),
196 def get_qemu_debug_opts(self):
197 cmds = [ "-no-reboot", "-no-shutdown" ]
198 debug = get_config(self._opt, "debug")
203 cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
205 trace_opts = get_config(debug, "traced", [])
206 for trace in trace_opts:
207 cmds += [ "--trace", f"{trace}"]
211 def get_qemu_general_opts(self):
213 "-m", get_config(self._opt, "memory", required=True),
214 "-smp", str(get_config(self._opt, "ncpu", default=1))
217 def add_peripheral(self, peripheral):
218 self._devices.append(peripheral)
220 def start(self, qemu_dir_override=""):
221 qemu_path = self.get_qemu_exec_name()
222 qemu_path = os.path.join(qemu_dir_override, qemu_path)
225 *self.get_qemu_general_opts(),
226 *self.get_qemu_arch_opts(),
227 *self.get_qemu_debug_opts()
230 for dev in self._devices:
231 cmds += dev.get_qemu_opts()
233 print(" ".join(cmds), "\n")
235 handle = subprocess.Popen(cmds)
238 ret_code = handle.poll()
239 if ret_code is not None:
243 class QEMUx86Exec(QEMUExec):
244 def __init__(self, options) -> None:
245 super().__init__(options)
247 def get_qemu_exec_name(self):
248 if get_config(self._opt, "arch") in ["i386", "x86_32"]:
249 return "qemu-system-i386"
251 return "qemu-system-x86_64"
256 arg = argparse.ArgumentParser()
258 arg.add_argument("config_file")
259 arg.add_argument("--qemu-dir", default="")
260 arg.add_argument("-v", "--values", action='append', default=[])
262 arg_opt = arg.parse_args()
265 with open(arg_opt.config_file, 'r') as f:
266 opts.update(json.loads(f.read()))
268 for kv in arg_opt.values:
269 [k, v] = kv.split('=')
272 arch = get_config(opts, "arch")
275 if arch in ["i386", "x86_32", "x86_64"]:
276 q = QEMUx86Exec(opts)
278 raise Exception(f"undefined arch: {arch}")
280 q.start(arg_opt.qemu_dir)
282 if __name__ == "__main__":