3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
8 logger = logging.getLogger("auto_qemu")
9 logging.basicConfig(level=logging.INFO)
13 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
15 def do_expand(val, path):
20 iters = subsitution.finditer(val)
21 for _, m in enumerate(iters, start=1):
25 parts.append(val[prev_pos:start_pos])
27 key = val[start_pos + 1:end_pos]
28 if key not in g_lookup:
30 f"reference key {key} (config: {path}) is not defined")
32 parts.append(g_lookup[key])
35 parts.append(val[prev_pos:])
38 def get_config(opt, path, default=None, required=False):
39 _path = PurePosixPath(path)
45 raise Exception(f"config: {path} is required")
48 if not isinstance(opt, str):
51 return do_expand(opt, path)
53 def join_attrs(attrs):
54 return ",".join(attrs)
57 return uuid.uuid4().hex[:8]
60 return "on" if b else "off"
64 #################################
65 # IO Backend Definitions
69 def __init__(self, opt, id_prefix="io") -> None:
70 self._type = get_config(opt, "type", required=True)
71 self._logfile = get_config(opt, "logfile")
72 self._id = f"{id_prefix}.{get_uniq()}"
74 def get_options(self):
77 opts.append(f"logfile={self._logfile}")
82 self._type, f"id={self._id}", *self.get_options()
88 class FileIOBackend(IOBackend):
89 def __init__(self, opt) -> None:
91 self.__path = get_config(opt, "path", required=True)
93 def get_options(self):
97 return opts + super().get_options()
99 class SocketIOBackend(IOBackend):
100 def __init__(self, opt) -> None:
101 super().__init__(opt)
102 self.__protocol = self._type
103 self._type = "socket"
104 self.__host = get_config(opt, "host", default="localhost")
105 self.__port = get_config(opt, "port", required=True)
106 self.__server = bool(get_config(opt, "server", True))
107 self.__wait = bool(get_config(opt, "wait", True))
109 def get_options(self):
111 f"host={self.__host}",
112 f"port={self.__port}",
113 f"server={map_bool(self.__server)}",
114 f"wait={map_bool(self.__wait)}",
116 if self.__protocol == "telnet":
117 opts.append("telnet=on")
118 if self.__protocol == "ws":
119 opts.append("websocket=on")
120 return opts + super().get_options()
122 def select_backend(opt):
123 bopt = get_config(opt, "io", required=True)
124 backend_type = get_config(bopt, "type", required=True)
126 if backend_type in ["telnet", "ws", "tcp"]:
127 return SocketIOBackend(bopt)
129 if backend_type in ["file", "pipe", "serial", "parallel"]:
130 return FileIOBackend(bopt)
132 return IOBackend(bopt)
136 #################################
137 # QEMU Emulated Device Definitions
140 class QEMUPeripherals:
141 def __init__(self, name, opt) -> None:
145 def get_qemu_opts(self) -> list:
148 class ISASerialDevice(QEMUPeripherals):
149 def __init__(self, opt) -> None:
150 super().__init__("isa-serial", opt)
152 def get_qemu_opts(self):
153 chardev = select_backend(self._opt)
157 f"id=com.{get_uniq()}",
158 f"chardev={chardev.name()}"
162 "-chardev", chardev.to_cmdline(),
163 "-device", join_attrs(cmds)
166 class PCISerialDevice(QEMUPeripherals):
167 def __init__(self, opt) -> None:
168 super().__init__("pci-serial", opt)
170 def get_qemu_opts(self):
171 chardev = select_backend(self._opt)
175 f"id=uart.{get_uniq()}",
176 f"chardev={chardev.name()}"
180 "-chardev", chardev.to_cmdline(),
181 "-device", join_attrs(cmds)
184 class AHCIBus(QEMUPeripherals):
185 def __init__(self, opt) -> None:
186 super().__init__("ahci", opt)
188 def __create_disklet(self, index, bus, opt):
189 d_type = get_config(opt, "type", default="ide-hd")
190 d_img = get_config(opt, "img", required=True)
191 d_ro = get_config(opt, "ro", default=False)
192 d_fmt = get_config(opt, "format", default="raw")
193 d_id = f"disk_{index}"
195 if not os.path.exists(d_img):
196 logger.warning(f"AHCI bus: {d_img} not exists, skipped")
200 "-drive", join_attrs([
203 f"readonly={'on' if d_ro else 'off'}",
207 "-device", join_attrs([
214 def get_qemu_opts(self):
216 name: str = get_config(opt, "name", required=True)
217 name = name.strip().replace(" ", "_")
218 cmds = [ "-device", f"ahci,id={name}" ]
220 disklets = get_config(opt, "disks", default=[])
221 for i, disk in enumerate(disklets):
222 cmds += self.__create_disklet(i, name, disk)
226 class RTCDevice(QEMUPeripherals):
227 def __init__(self, opt) -> None:
228 super().__init__("rtc", opt)
230 def get_qemu_opts(self):
232 base = get_config(opt, "base", default="utc")
233 return [ "-rtc", f"base={base}" ]
235 class QEMUMonitor(QEMUPeripherals):
236 def __init__(self, opt) -> None:
237 super().__init__("monitor", opt)
239 def get_qemu_opts(self):
241 chardev = select_backend(self._opt)
244 "-chardev", chardev.to_cmdline(),
253 "isa-serial": ISASerialDevice,
257 "pci-serial": PCISerialDevice
262 if name not in QEMUDevices.__devs:
263 raise Exception(f"device class: {name} is not defined")
264 return QEMUDevices.__devs[name]
268 #################################
269 # QEMU Machine Definitions
275 def __init__(self, options) -> None:
279 for dev in get_config(options, "devices", default=[]):
280 dev_class = get_config(dev, "class")
281 device = QEMUDevices.get(dev_class)
282 self._devices.append(device(dev))
284 def get_qemu_exec_name(self):
287 def get_qemu_arch_opts(self):
289 "-machine", get_config(self._opt, "machine"),
291 get_config(self._opt, "cpu/type", required=True),
292 *get_config(self._opt, "cpu/features", default=[]),
298 def get_qemu_debug_opts(self):
299 cmds = [ "-no-reboot", "-no-shutdown" ]
300 debug = get_config(self._opt, "debug")
305 cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
307 trace_opts = get_config(debug, "traced", [])
308 for trace in trace_opts:
309 cmds += [ "--trace", f"{trace}"]
313 def get_qemu_general_opts(self):
315 "-m", get_config(self._opt, "memory", required=True),
316 "-smp", str(get_config(self._opt, "ncpu", default=1))
319 kopts = get_config(self._opt, "kernel")
322 "-kernel", get_config(kopts, "bin", required=True),
323 "-append", get_config(kopts, "cmd", required=True)
326 dtb = get_config(kopts, "dtb")
328 opts += [ "-dtb", dtb ]
332 def add_peripheral(self, peripheral):
333 self._devices.append(peripheral)
335 def start(self, qemu_dir_override="", dryrun=False, extras=[]):
336 qemu_path = self.get_qemu_exec_name()
337 qemu_path = os.path.join(qemu_dir_override, qemu_path)
341 *self.get_qemu_general_opts(),
342 *self.get_qemu_arch_opts(),
343 *self.get_qemu_debug_opts()
346 for dev in self._devices:
347 cmds += dev.get_qemu_opts()
349 logger.info(" ".join(cmds))
352 logger.info("[DRY RUN] QEMU not invoked")
355 handle = subprocess.Popen(cmds)
356 logger.info(f"QEMU launched (pid={handle.pid})")
359 ret_code = handle.poll()
360 if ret_code is not None:
364 class QEMUx86Exec(QEMUExec):
365 def __init__(self, options) -> None:
366 super().__init__(options)
368 def get_qemu_exec_name(self):
369 if get_config(self._opt, "arch") in ["i386", "x86_32"]:
370 return "qemu-system-i386"
372 return "qemu-system-x86_64"
377 arg = argparse.ArgumentParser()
379 arg.add_argument("config_file")
380 arg.add_argument("--qemu-dir", default="")
381 arg.add_argument("--dry", action='store_true')
382 arg.add_argument("-v", "--values", action='append', default=[])
384 arg_opt, extras = arg.parse_known_args()
387 with open(arg_opt.config_file, 'r') as f:
388 opts.update(json.loads(f.read()))
390 for kv in arg_opt.values:
391 splits = kv.split('=')
392 k, v = splits[0], "=".join(splits[1:])
395 arch = get_config(opts, "arch")
398 if arch in ["i386", "x86_32", "x86_64"]:
399 q = QEMUx86Exec(opts)
401 raise Exception(f"undefined arch: {arch}")
403 extras = [ x for x in extras if x != '--']
404 q.start(arg_opt.qemu_dir, arg_opt.dry, extras)
406 if __name__ == "__main__":