refine the live_debug.sh, move gdb to new tmux window
[lunaix-os.git] / lunaix-os / scripts / qemu.py
1 #!/usr/bin/env python 
2
3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
5 import logging
6 import uuid
7
8 logger = logging.getLogger("auto_qemu")
9 logging.basicConfig(level=logging.INFO)
10
11 g_lookup = {}
12
13 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
14
15 def do_expand(val, path):
16     global g_lookup
17
18     parts = []
19     prev_pos = 0
20     iters = subsitution.finditer(val)
21     for _, m in enumerate(iters, start=1):
22         start_pos = m.start()
23         end_pos = m.end()
24
25         parts.append(val[prev_pos:start_pos])
26
27         key = val[start_pos + 1:end_pos]
28         if key not in g_lookup:
29             raise Exception(
30                 f"reference key {key} (config: {path}) is not defined")
31
32         parts.append(g_lookup[key])
33         prev_pos = end_pos
34
35     parts.append(val[prev_pos:])
36     return "".join(parts)
37
38 def get_config(opt, path, default=None, required=False):
39     _path = PurePosixPath(path)
40     for p in _path.parts:
41         if p in opt:
42             opt = opt[p]
43             continue
44         if required:
45             raise Exception(f"config: {path} is required")
46         return default
47         
48     if not isinstance(opt, str):
49         return opt
50
51     return do_expand(opt, path)
52
53 def join_attrs(attrs):
54     return ",".join(attrs)
55
56 def get_uniq():
57     return uuid.uuid4().hex[:8]
58
59 def map_bool(b):
60     return "on" if b else "off"
61     
62
63
64 #################################
65 # IO Backend Definitions  
66 #
67
68 class IOBackend:
69     def __init__(self, opt, id_prefix="io") -> None:
70         self._type = get_config(opt, "type", required=True)
71         self._logfile = get_config(opt, "logfile")
72         self._id = f"{id_prefix}.{get_uniq()}"
73
74     def get_options(self):
75         opts = []
76         if self._logfile:
77             opts.append(f"logfile={self._logfile}")
78         return opts
79     
80     def to_cmdline(self):
81         return join_attrs([
82             self._type, f"id={self._id}", *self.get_options()
83         ])
84     
85     def name(self):
86         return self._id
87
88 class FileIOBackend(IOBackend):
89     def __init__(self, opt) -> None:
90         super().__init__(opt)
91         self.__path = get_config(opt, "path", required=True)
92
93     def get_options(self):
94         opts = [
95             f"path={self.__path}"
96         ]
97         return opts + super().get_options()
98
99 class SocketIOBackend(IOBackend):
100     def __init__(self, opt) -> None:
101         super().__init__(opt)
102         self.__protocol = self._type
103         self._type = "socket"
104         self.__host = get_config(opt, "host", default="localhost")
105         self.__port = get_config(opt, "port", required=True)
106         self.__server = bool(get_config(opt, "server", True))
107         self.__wait = bool(get_config(opt, "wait", True))
108
109     def get_options(self):
110         opts = [
111             f"host={self.__host}",
112             f"port={self.__port}",
113             f"server={map_bool(self.__server)}",
114             f"wait={map_bool(self.__wait)}",
115         ]
116         if self.__protocol == "telnet":
117             opts.append("telnet=on")
118         if self.__protocol == "ws":
119             opts.append("websocket=on")
120         return opts + super().get_options()
121     
122 def select_backend(opt):
123     bopt = get_config(opt, "io", required=True)
124     backend_type = get_config(bopt, "type", required=True)
125     
126     if backend_type in ["telnet", "ws", "tcp"]:
127         return SocketIOBackend(bopt)
128     
129     if backend_type in ["file", "pipe", "serial", "parallel"]:
130         return FileIOBackend(bopt)
131     
132     return IOBackend(bopt)
133
134
135
136 #################################
137 # QEMU Emulated Device Definitions
138 #
139
140 class QEMUPeripherals:
141     def __init__(self, name, opt) -> None:
142         self.name = name
143         self._opt = opt
144
145     def get_qemu_opts(self) -> list:
146         pass
147
148 class ISASerialDevice(QEMUPeripherals):
149     def __init__(self, opt) -> None:
150         super().__init__("isa-serial", opt)
151
152     def get_qemu_opts(self):
153         chardev = select_backend(self._opt)
154
155         cmds = [ 
156             "isa-serial", 
157            f"id=com.{get_uniq()}", 
158            f"chardev={chardev.name()}" 
159         ]
160
161         return [ 
162             "-chardev", chardev.to_cmdline(),
163             "-device", join_attrs(cmds)
164          ]
165     
166 class PCISerialDevice(QEMUPeripherals):
167     def __init__(self, opt) -> None:
168         super().__init__("pci-serial", opt)
169
170     def get_qemu_opts(self):
171         chardev = select_backend(self._opt)
172
173         cmds = [ 
174             "pci-serial", 
175            f"id=uart.{get_uniq()}", 
176            f"chardev={chardev.name()}" 
177         ]
178
179         return [ 
180             "-chardev", chardev.to_cmdline(),
181             "-device", join_attrs(cmds)
182          ]
183     
184 class AHCIBus(QEMUPeripherals):
185     def __init__(self, opt) -> None:
186         super().__init__("ahci", opt)
187         
188     def __create_disklet(self, index, bus, opt):
189         d_type = get_config(opt, "type",   default="ide-hd")
190         d_img  = get_config(opt, "img",    required=True)
191         d_ro   = get_config(opt, "ro",     default=False)
192         d_fmt  = get_config(opt, "format", default="raw")
193         d_id   = f"disk_{index}"
194
195         if not os.path.exists(d_img):
196             logger.warning(f"AHCI bus: {d_img} not exists, skipped")
197             return []
198         
199         return [
200             "-drive", join_attrs([
201                 f"id={d_id}",
202                 f"file={d_img}",
203                 f"readonly={'on' if d_ro else 'off'}",
204                 f"if=none",
205                 f"format={d_fmt}"
206             ]),
207             "-device", join_attrs([
208                 d_type,
209                 f"drive={d_id}",
210                 f"bus={bus}.{index}"
211             ])
212         ]
213
214     def get_qemu_opts(self):
215         opt = self._opt
216         name: str = get_config(opt, "name", required=True)
217         name = name.strip().replace(" ", "_")
218         cmds = [ "-device", f"ahci,id={name}" ]
219
220         disklets = get_config(opt, "disks", default=[])
221         for i, disk in enumerate(disklets):
222             cmds += self.__create_disklet(i, name, disk)
223         
224         return cmds
225     
226 class RTCDevice(QEMUPeripherals):
227     def __init__(self, opt) -> None:
228         super().__init__("rtc", opt)
229     
230     def get_qemu_opts(self):
231         opt = self._opt
232         base = get_config(opt, "base", default="utc")
233         return [ "-rtc", f"base={base}" ]
234
235 class QEMUMonitor(QEMUPeripherals):
236     def __init__(self, opt) -> None:
237         super().__init__("monitor", opt)
238
239     def get_qemu_opts(self):
240         
241         chardev = select_backend(self._opt)
242
243         return [
244             "-chardev", chardev.to_cmdline(),
245             "-mon", join_attrs([
246                 chardev.name(),
247                 "mode=readline",
248             ])
249         ]
250
251 class QEMUDevices:
252     __devs = {
253         "isa-serial": ISASerialDevice,
254         "ahci": AHCIBus,
255         "rtc": RTCDevice,
256         "hmp": QEMUMonitor,
257         "pci-serial": PCISerialDevice
258     }
259
260     @staticmethod
261     def get(name):
262         if name not in QEMUDevices.__devs:
263             raise Exception(f"device class: {name} is not defined")
264         return QEMUDevices.__devs[name]
265
266
267
268 #################################
269 # QEMU Machine Definitions
270 #
271
272 class QEMUExec:
273     
274
275     def __init__(self, options) -> None:
276         self._opt = options
277         self._devices = []
278
279         for dev in get_config(options, "devices", default=[]):
280             dev_class = get_config(dev, "class")
281             device = QEMUDevices.get(dev_class)
282             self._devices.append(device(dev))
283     
284     def get_qemu_exec_name(self):
285         pass
286
287     def get_qemu_arch_opts(self):
288         cmds = [
289             "-machine", get_config(self._opt, "machine"),
290             "-cpu", join_attrs([ 
291                  get_config(self._opt, "cpu/type", required=True),
292                 *get_config(self._opt, "cpu/features", default=[]),
293              ])
294         ]
295
296         return cmds
297
298     def get_qemu_debug_opts(self):
299         cmds = [ "-no-reboot", "-no-shutdown" ]
300         debug = get_config(self._opt, "debug")
301         if not debug:
302             return cmds
303         
304         cmds.append("-S")
305         cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
306
307         trace_opts = get_config(debug, "traced", [])
308         for trace in trace_opts:
309             cmds += [ "--trace", f"{trace}"]
310
311         return cmds
312     
313     def get_qemu_general_opts(self):
314         opts = [
315             "-m", get_config(self._opt, "memory", required=True),
316             "-smp", str(get_config(self._opt, "ncpu", default=1))
317         ]
318
319         kopts = get_config(self._opt, "kernel")
320         if kopts:
321             opts += [
322                 "-kernel", get_config(kopts, "bin", required=True),
323                 "-append", get_config(kopts, "cmd", required=True)
324             ]
325
326             dtb = get_config(kopts, "dtb")
327             if dtb:
328                 opts += [ "-dtb", dtb ]
329
330         return opts
331
332     def add_peripheral(self, peripheral):
333         self._devices.append(peripheral)
334
335     def start(self, qemu_dir_override="", dryrun=False, extras=[]):
336         qemu_path = self.get_qemu_exec_name()
337         qemu_path = os.path.join(qemu_dir_override, qemu_path)
338         cmds = [
339             qemu_path,
340             *self.get_qemu_general_opts(),
341             *self.get_qemu_arch_opts(),
342             *self.get_qemu_debug_opts()
343         ]
344
345         for dev in self._devices:
346             cmds += dev.get_qemu_opts()
347
348         cmds += extras
349         logger.info(" ".join(cmds))
350
351         if dryrun:
352             logger.info("[DRY RUN] QEMU not invoked")
353             return
354         
355         handle = subprocess.Popen(cmds)
356         logger.info(f"QEMU launched (pid={handle.pid})")
357         
358         while True:
359             ret_code = handle.poll()
360             if ret_code is not None:
361                 return ret_code
362             time.sleep(5)
363
364 class QEMUx86Exec(QEMUExec):
365     def __init__(self, options) -> None:
366         super().__init__(options)
367
368     def get_qemu_exec_name(self):
369         if get_config(self._opt, "arch") in ["i386", "x86_32"]:
370             return "qemu-system-i386"
371         else:
372             return "qemu-system-x86_64"
373
374 def main():
375     global g_lookup
376
377     arg = argparse.ArgumentParser()
378
379     arg.add_argument("config_file")
380     arg.add_argument("--qemu-dir", default="")
381     arg.add_argument("--dry", action='store_true')
382     arg.add_argument("-v", "--values", action='append', default=[])
383
384     arg_opt, extras = arg.parse_known_args()
385
386     opts = {}
387     with open(arg_opt.config_file, 'r') as f:
388         opts.update(json.loads(f.read()))
389     
390     for kv in arg_opt.values:
391         splits = kv.split('=')
392         k, v = splits[0], "=".join(splits[1:])
393         g_lookup[k] = v
394
395     arch = get_config(opts, "arch")
396
397     q = None
398     if arch in ["i386", "x86_32", "x86_64"]:
399         q = QEMUx86Exec(opts)
400     else:
401         raise Exception(f"undefined arch: {arch}")
402     
403     extras = [ x for x in extras if x != '--']
404     q.start(arg_opt.qemu_dir, arg_opt.dry, extras)
405
406 if __name__ == "__main__":
407     main()