Change of vterm handling logic on backend chardev input event (#40)
[lunaix-os.git] / lunaix-os / scripts / qemu.py
1 #!/usr/bin/env python 
2
3 import subprocess, time, os, re, argparse, json
4 from pathlib import PurePosixPath
5
6 g_lookup = {}
7
8 subsitution = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*")
9
10 def do_expand(val, path):
11     global g_lookup
12
13     parts = []
14     prev_pos = 0
15     iters = subsitution.finditer(val)
16     for _, m in enumerate(iters, start=1):
17         start_pos = m.start()
18         end_pos = m.end()
19
20         parts.append(val[prev_pos:start_pos])
21
22         key = val[start_pos + 1:end_pos]
23         if key not in g_lookup:
24             raise Exception(
25                 f"reference key {key} (config: {path}) is not defined")
26
27         parts.append(g_lookup[key])
28         prev_pos = end_pos
29
30     parts.append(val[prev_pos:])
31     return "".join(parts)
32
33 def get_config(opt, path, default=None, required=False):
34     _path = PurePosixPath(path)
35     for p in _path.parts:
36         if p in opt:
37             opt = opt[p]
38             continue
39         if required:
40             raise Exception(f"config: {path} is required")
41         return default
42         
43     if not isinstance(opt, str):
44         return opt
45
46     return do_expand(opt, path)
47
48 def join_attrs(attrs):
49     return ",".join(attrs)
50
51 def parse_protocol(opt):
52     protocol = get_config(opt, "protocol", "telnet")
53     addr     = get_config(opt, "addr", ":12345")
54     logfile  = get_config(opt, "logfile")
55
56     return (f"{protocol}:{addr}", logfile)
57
58 class QEMUPeripherals:
59     def __init__(self, name, opt) -> None:
60         self.name = name
61         self._opt = opt
62
63     def get_qemu_opts(self) -> list:
64         pass
65
66 class BasicSerialDevice(QEMUPeripherals):
67     def __init__(self, opt) -> None:
68         super().__init__("serial", opt)
69
70     def get_qemu_opts(self):
71         link, logfile = parse_protocol(self._opt)
72
73         cmds = [ link, "server", "nowait" ]
74         if logfile:
75             cmds.append(f"logfile={logfile}")
76         return [ "-serial", join_attrs(cmds) ]
77     
78 class PCISerialDevice(QEMUPeripherals):
79     def __init__(self, opt) -> None:
80         super().__init__("pci-serial", opt)
81
82     def get_qemu_opts(self):
83         uniq = hex(self.__hash__())[2:]
84         name = f"chrdev.{uniq}"
85         cmds = [ "pci-serial", f"id=uart.{uniq}", f"chardev={name}" ]
86         chrdev = [ "file", f"id={name}" ]
87         
88         logfile = get_config(self._opt, "logfile", required=True)
89         chrdev.append(f"path={logfile}")
90
91         return [ 
92             "-chardev", join_attrs(chrdev),
93             "-device", join_attrs(cmds)
94          ]
95     
96 class AHCIBus(QEMUPeripherals):
97     def __init__(self, opt) -> None:
98         super().__init__("ahci", opt)
99
100     def get_qemu_opts(self):
101         opt = self._opt
102         name: str = get_config(opt, "name", required=True)
103         name = name.strip().replace(" ", "_")
104         cmds = [ "-device", f"ahci,id={name}" ]
105
106         for i, disk in enumerate(get_config(opt, "disks", default=[])):
107             d_type = get_config(disk, "type",   default="ide-hd")
108             d_img  = get_config(disk, "img",    required=True)
109             d_ro   = get_config(disk, "ro",     default=False)
110             d_fmt  = get_config(disk, "format", default="raw")
111             d_id   = f"disk_{i}"
112             
113             cmds += [
114                 "-drive", join_attrs([
115                     f"id={d_id},"
116                     f"file={d_img}",
117                     f"readonly={'on' if d_ro else 'off'}",
118                     f"if=none",
119                     f"format={d_fmt}"
120                 ]),
121                 "-device", join_attrs([
122                     d_type,
123                     f"drive={d_id}",
124                     f"bus={name}.{i}"
125                 ])
126             ]
127         
128         return cmds
129     
130 class RTCDevice(QEMUPeripherals):
131     def __init__(self, opt) -> None:
132         super().__init__("rtc", opt)
133     
134     def get_qemu_opts(self):
135         opt = self._opt
136         base = get_config(opt, "base", default="utc")
137         return [ "-rtc", f"base={base}" ]
138
139 class QEMUMonitor(QEMUPeripherals):
140     def __init__(self, opt) -> None:
141         super().__init__("monitor", opt)
142
143     def get_qemu_opts(self):
144         link, logfile = parse_protocol(self._opt)
145
146         return [
147             "-monitor", join_attrs([
148                 link,
149                 "server",
150                 "nowait", 
151                 f"logfile={logfile}"
152             ])
153         ]
154
155 class QEMUExec:
156     devices = {
157         "basic_serial": BasicSerialDevice,
158         "ahci": AHCIBus,
159         "rtc": RTCDevice,
160         "hmp": QEMUMonitor,
161         "pci-serial": PCISerialDevice
162     }
163
164     def __init__(self, options) -> None:
165         self._opt = options
166         self._devices = []
167
168         for dev in get_config(options, "devices", default=[]):
169             dev_class = get_config(dev, "class")
170             if dev_class not in QEMUExec.devices:
171                 raise Exception(f"device class: {dev_class} is not defined")
172             
173             self._devices.append(QEMUExec.devices[dev_class](dev))
174     
175     def get_qemu_exec_name(self):
176         pass
177
178     def get_qemu_arch_opts(self):
179         cmds = [
180             "-machine", get_config(self._opt, "machine"),
181             "-cpu", join_attrs([ 
182                  get_config(self._opt, "cpu/type", required=True),
183                 *get_config(self._opt, "cpu/features", default=[]),
184              ])
185         ]
186
187         return cmds
188
189     def get_qemu_debug_opts(self):
190         cmds = [ "-no-reboot", "-no-shutdown" ]
191         debug = get_config(self._opt, "debug")
192         if not debug:
193             return cmds
194         
195         cmds.append("-S")
196         cmds += [ "-gdb", f"tcp::{get_config(debug, 'gdb_port', default=1234)}" ]
197
198         trace_opts = get_config(debug, "traced", [])
199         for trace in trace_opts:
200             cmds += [ "--trace", f"{trace}"]
201
202         return cmds
203     
204     def get_qemu_general_opts(self):
205         return [
206             "-m", get_config(self._opt, "memory", required=True),
207             "-smp", get_config(self._opt, "ncpu", default=1)
208         ]
209
210     def add_peripheral(self, peripheral):
211         self._devices.append(peripheral)
212
213     def start(self, qemu_dir_override=""):
214         qemu_path = self.get_qemu_exec_name()
215         qemu_path = os.path.join(qemu_dir_override, qemu_path)
216         cmds = [
217             qemu_path,
218             *self.get_qemu_arch_opts(),
219             *self.get_qemu_debug_opts()
220         ]
221
222         for dev in self._devices:
223             cmds += dev.get_qemu_opts()
224
225         print(" ".join(cmds), "\n")
226         
227         handle = subprocess.Popen(cmds)
228         
229         while True:
230             ret_code = handle.poll()
231             if ret_code is not None:
232                 return ret_code
233             time.sleep(5)
234
235 class QEMUx86Exec(QEMUExec):
236     def __init__(self, options) -> None:
237         super().__init__(options)
238
239     def get_qemu_exec_name(self):
240         if get_config(self._opt, "arch") in ["i386", "x86_32"]:
241             return "qemu-system-i386"
242         else:
243             return "qemu-system-x86_64"
244
245 def main():
246     global g_lookup
247
248     arg = argparse.ArgumentParser()
249
250     arg.add_argument("config_file")
251     arg.add_argument("--qemu-dir", default="")
252     arg.add_argument("-v", "--values", action='append', default=[])
253
254     arg_opt = arg.parse_args()
255
256     opts = {}
257     with open(arg_opt.config_file, 'r') as f:
258         opts.update(json.loads(f.read()))
259     
260     for kv in arg_opt.values:
261         [k, v] = kv.split('=')
262         g_lookup[k] = v
263
264     arch = get_config(opts, "arch")
265
266     q = None
267     if arch in ["i386", "x86_32", "x86_64"]:
268         q = QEMUx86Exec(opts)
269     else:
270         raise Exception(f"undefined arch: {arch}")
271     
272     q.start(arg_opt.qemu_dir)
273
274 if __name__ == "__main__":
275     try:
276         main()
277     except Exception as e:
278         print(e)