Second Extended Filesystem (ext2) and other improvements (#33)
[lunaix-os.git] / lunaix-os / scripts / qemu.py
1 #!/usr/bin/env python 
2
3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
5 import logging
6
7 logger = logging.getLogger("auto_qemu")
8
9 g_lookup = {}
10
11 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
12
13 def do_expand(val, path):
14     global g_lookup
15
16     parts = []
17     prev_pos = 0
18     iters = subsitution.finditer(val)
19     for _, m in enumerate(iters, start=1):
20         start_pos = m.start()
21         end_pos = m.end()
22
23         parts.append(val[prev_pos:start_pos])
24
25         key = val[start_pos + 1:end_pos]
26         if key not in g_lookup:
27             raise Exception(
28                 f"reference key {key} (config: {path}) is not defined")
29
30         parts.append(g_lookup[key])
31         prev_pos = end_pos
32
33     parts.append(val[prev_pos:])
34     return "".join(parts)
35
36 def get_config(opt, path, default=None, required=False):
37     _path = PurePosixPath(path)
38     for p in _path.parts:
39         if p in opt:
40             opt = opt[p]
41             continue
42         if required:
43             raise Exception(f"config: {path} is required")
44         return default
45         
46     if not isinstance(opt, str):
47         return opt
48
49     return do_expand(opt, path)
50
51 def join_attrs(attrs):
52     return ",".join(attrs)
53
54 def parse_protocol(opt):
55     protocol = get_config(opt, "protocol", "telnet")
56     addr     = get_config(opt, "addr", ":12345")
57     logfile  = get_config(opt, "logfile")
58
59     return (f"{protocol}:{addr}", logfile)
60
61 class QEMUPeripherals:
62     def __init__(self, name, opt) -> None:
63         self.name = name
64         self._opt = opt
65
66     def get_qemu_opts(self) -> list:
67         pass
68
69 class BasicSerialDevice(QEMUPeripherals):
70     def __init__(self, opt) -> None:
71         super().__init__("serial", opt)
72
73     def get_qemu_opts(self):
74         link, logfile = parse_protocol(self._opt)
75
76         cmds = [ link, "server", "nowait" ]
77         if logfile:
78             cmds.append(f"logfile={logfile}")
79         return [ "-serial", join_attrs(cmds) ]
80     
81 class PCISerialDevice(QEMUPeripherals):
82     def __init__(self, opt) -> None:
83         super().__init__("pci-serial", opt)
84
85     def get_qemu_opts(self):
86         uniq = hex(self.__hash__())[2:]
87         name = f"chrdev.{uniq}"
88         cmds = [ "pci-serial", f"id=uart.{uniq}", f"chardev={name}" ]
89         chrdev = [ "file", f"id={name}" ]
90         
91         logfile = get_config(self._opt, "logfile", required=True)
92         chrdev.append(f"path={logfile}")
93
94         return [ 
95             "-chardev", join_attrs(chrdev),
96             "-device", join_attrs(cmds)
97          ]
98     
99 class AHCIBus(QEMUPeripherals):
100     def __init__(self, opt) -> None:
101         super().__init__("ahci", opt)
102
103     def get_qemu_opts(self):
104         opt = self._opt
105         name: str = get_config(opt, "name", required=True)
106         name = name.strip().replace(" ", "_")
107         cmds = [ "-device", f"ahci,id={name}" ]
108
109         for i, disk in enumerate(get_config(opt, "disks", default=[])):
110             d_type = get_config(disk, "type",   default="ide-hd")
111             d_img  = get_config(disk, "img",    required=True)
112             d_ro   = get_config(disk, "ro",     default=False)
113             d_fmt  = get_config(disk, "format", default="raw")
114             d_id   = f"disk_{i}"
115
116             if not os.path.exists(d_img):
117                 logger.warning(f"AHCI bus: {d_img} not exists, disk skipped")
118                 continue
119             
120             cmds += [
121                 "-drive", join_attrs([
122                     f"id={d_id},"
123                     f"file={d_img}",
124                     f"readonly={'on' if d_ro else 'off'}",
125                     f"if=none",
126                     f"format={d_fmt}"
127                 ]),
128                 "-device", join_attrs([
129                     d_type,
130                     f"drive={d_id}",
131                     f"bus={name}.{i}"
132                 ])
133             ]
134         
135         return cmds
136     
137 class RTCDevice(QEMUPeripherals):
138     def __init__(self, opt) -> None:
139         super().__init__("rtc", opt)
140     
141     def get_qemu_opts(self):
142         opt = self._opt
143         base = get_config(opt, "base", default="utc")
144         return [ "-rtc", f"base={base}" ]
145
146 class QEMUMonitor(QEMUPeripherals):
147     def __init__(self, opt) -> None:
148         super().__init__("monitor", opt)
149
150     def get_qemu_opts(self):
151         link, logfile = parse_protocol(self._opt)
152
153         return [
154             "-monitor", join_attrs([
155                 link,
156                 "server",
157                 "nowait", 
158                 f"logfile={logfile}"
159             ])
160         ]
161
162 class QEMUExec:
163     devices = {
164         "basic_serial": BasicSerialDevice,
165         "ahci": AHCIBus,
166         "rtc": RTCDevice,
167         "hmp": QEMUMonitor,
168         "pci-serial": PCISerialDevice
169     }
170
171     def __init__(self, options) -> None:
172         self._opt = options
173         self._devices = []
174
175         for dev in get_config(options, "devices", default=[]):
176             dev_class = get_config(dev, "class")
177             if dev_class not in QEMUExec.devices:
178                 raise Exception(f"device class: {dev_class} is not defined")
179             
180             self._devices.append(QEMUExec.devices[dev_class](dev))
181     
182     def get_qemu_exec_name(self):
183         pass
184
185     def get_qemu_arch_opts(self):
186         cmds = [
187             "-machine", get_config(self._opt, "machine"),
188             "-cpu", join_attrs([ 
189                  get_config(self._opt, "cpu/type", required=True),
190                 *get_config(self._opt, "cpu/features", default=[]),
191              ])
192         ]
193
194         return cmds
195
196     def get_qemu_debug_opts(self):
197         cmds = [ "-no-reboot", "-no-shutdown" ]
198         debug = get_config(self._opt, "debug")
199         if not debug:
200             return cmds
201         
202         cmds.append("-S")
203         cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
204
205         trace_opts = get_config(debug, "traced", [])
206         for trace in trace_opts:
207             cmds += [ "--trace", f"{trace}"]
208
209         return cmds
210     
211     def get_qemu_general_opts(self):
212         return [
213             "-m", get_config(self._opt, "memory", required=True),
214             "-smp", str(get_config(self._opt, "ncpu", default=1))
215         ]
216
217     def add_peripheral(self, peripheral):
218         self._devices.append(peripheral)
219
220     def start(self, qemu_dir_override=""):
221         qemu_path = self.get_qemu_exec_name()
222         qemu_path = os.path.join(qemu_dir_override, qemu_path)
223         cmds = [
224             qemu_path,
225             *self.get_qemu_general_opts(),
226             *self.get_qemu_arch_opts(),
227             *self.get_qemu_debug_opts()
228         ]
229
230         for dev in self._devices:
231             cmds += dev.get_qemu_opts()
232
233         print(" ".join(cmds), "\n")
234         
235         handle = subprocess.Popen(cmds)
236         
237         while True:
238             ret_code = handle.poll()
239             if ret_code is not None:
240                 return ret_code
241             time.sleep(5)
242
243 class QEMUx86Exec(QEMUExec):
244     def __init__(self, options) -> None:
245         super().__init__(options)
246
247     def get_qemu_exec_name(self):
248         if get_config(self._opt, "arch") in ["i386", "x86_32"]:
249             return "qemu-system-i386"
250         else:
251             return "qemu-system-x86_64"
252
253 def main():
254     global g_lookup
255
256     arg = argparse.ArgumentParser()
257
258     arg.add_argument("config_file")
259     arg.add_argument("--qemu-dir", default="")
260     arg.add_argument("-v", "--values", action='append', default=[])
261
262     arg_opt = arg.parse_args()
263
264     opts = {}
265     with open(arg_opt.config_file, 'r') as f:
266         opts.update(json.loads(f.read()))
267     
268     for kv in arg_opt.values:
269         [k, v] = kv.split('=')
270         g_lookup[k] = v
271
272     arch = get_config(opts, "arch")
273
274     q = None
275     if arch in ["i386", "x86_32", "x86_64"]:
276         q = QEMUx86Exec(opts)
277     else:
278         raise Exception(f"undefined arch: {arch}")
279     
280     q.start(arg_opt.qemu_dir)
281
282 if __name__ == "__main__":
283     main()