cf869b058d07a845cf9665fbae7a795a3e482057
[lunaix-os.git] / lunaix-os / scripts / build-tools / lcfg / lcnodes.py
1 from .api import (
2     ConfigLoadException,
3     ConfigTypeCheckError,
4     Renderable
5 )
6
7 from .utils import (
8     extract_decorators, 
9     to_displayable
10 )
11
12 import ast, textwrap
13 from abc import abstractmethod
14
15
16
17 class LCNode(Renderable):
18     def __init__(self, fo, astn):
19         super().__init__()
20
21         self._fo = fo
22         self._env = fo.env()
23         self._astn = self.prune_astn(astn)
24         self._co = self.compile_astn()
25         self._parent = None
26
27         self._env.register_node(self)
28     
29     def prune_astn(self, astn):
30         return astn
31
32     def compile_astn(self):
33         return self._fo.compile_astns(self._astn)
34
35     def get_co(self):
36         return self._co
37     
38     def get_fo(self):
39         return self._fo
40     
41     def set_parent(self, new_parent):
42         self._parent = new_parent
43
44     def parent(self):
45         return self._parent
46     
47     def add_child(self, node):
48         pass
49
50     def remove_child(self, node):
51         pass
52     
53     def get_name(self):
54         return f"LCNode: {self.__hash__()}"
55     
56     @abstractmethod
57     def evaluate(self):
58         pass
59
60     def render(self, rctx):
61         pass
62
63     def deserialise(self, dict):
64         pass
65
66     def serialise(self, dict):
67         pass
68
69
70
71 class LCModuleNode(LCNode):
72     def __init__(self, fo, astn: ast.Module):
73         self.__nodes = {}
74
75         super().__init__(fo, astn)
76
77     def get_name(self):
78         return f"file: {self._fo.filename()}"
79
80     def prune_astn(self, astn: ast.Module):
81         general_exprs = []
82
83         for b in astn.body:
84             if not isinstance(b, ast.FunctionDef):
85                 general_exprs.append(b)
86                 continue
87             
88             node = LCFuncNode.construct(self._fo, b)
89             node.set_parent(self)
90
91             self.add_child(node)
92
93         return general_exprs
94     
95     def evaluate(self):
96         with self._env.eval_context(self) as evalc:
97             ls = list(self.__nodes.values())          
98             for node in ls:
99                 node.evaluate()
100
101     def add_child(self, node):
102         self.__nodes[node] = node
103
104     def remove_child(self, node):
105         if node in self.__nodes:
106             del self.__nodes[node]
107
108     def deserialise(self, dict):
109         for node in self.__nodes:
110             node.deserialise(dict)
111
112     def serialise(self, dict):
113         for node in self.__nodes:
114             node.serialise(dict)
115
116     def render(self, rctx):
117         for node in self.__nodes:
118             node.render(rctx)
119
120 class LCFuncNode(LCNode):
121     def __init__(self, fo, astn) -> None:
122         self._decors = {}
123         self._name = None
124         self._help = ""
125         self._display_name = None
126         self._enabled = True
127
128         super().__init__(fo, astn)
129
130     def prune_astn(self, astn: ast.FunctionDef):
131         self._name = astn.name
132         self._display_name = to_displayable(self._name)
133         
134         maybe_doc = astn.body[0]
135         if isinstance(maybe_doc, ast.Expr):
136             if isinstance(maybe_doc.value, ast.Constant):
137                 self._help = maybe_doc.value.value
138                 self._help = textwrap.dedent(self._help)
139         
140         decors = extract_decorators(astn)
141         for name, args, kwargs in decors:
142             self._decors[name] = (args, kwargs)
143
144         (args, _) = self._decors[self.mapped_name()]
145         if args:
146             self._display_name = args[0]
147         
148         astn.decorator_list.clear()
149         return astn
150     
151     def get_name(self):
152         return self._name
153     
154     def get_display_name(self):
155         return self._display_name
156     
157     def enabled(self):
158         if isinstance(self._parent, LCFuncNode):
159             return self._enabled and self._parent.enabled()
160         return self._enabled
161
162     def help_prompt(self):
163         return self._help
164     
165     def evaluate(self):
166         with self._env.eval_context(self) as evalc:
167             result = evalc.evaluate()
168             self._enabled = True if result is None else result
169             
170     @staticmethod
171     def mapped_name(self):
172         return None
173     
174     @staticmethod
175     def construct(fo, astn: ast.FunctionDef):
176         nodes = [
177             LCCollectionNode,
178             LCGroupNode,
179             LCTermNode
180         ]
181
182         for node in nodes:
183             if extract_decorators(astn, node.mapped_name(), True):
184                 return node(fo, astn)
185         
186         raise ConfigLoadException(
187                 f"unknown type for astn type: {type(astn)}")
188
189     def set_parent(self, new_parent):
190         if self._parent:
191             self._parent.remove_child(self)
192         
193         new_parent.add_child(self)
194         super().set_parent(new_parent)
195
196
197 class LCTermNode(LCFuncNode):
198     def __init__(self, fo, astn) -> None:
199         self._value = None
200         self._default = None
201         self._type = None
202         self._rdonly = False
203
204         super().__init__(fo, astn)
205
206     @staticmethod
207     def mapped_name():
208         return "Term"
209     
210     def prune_astn(self, astn: ast.FunctionDef):
211         astn = super().prune_astn(astn)
212
213         self._rdonly = "ReadOnly" in self._decors
214
215         return astn
216
217     def set_type(self, type_def):
218         self._type = self._env.type_factory().create(type_def)
219
220     def get_type(self):
221         return self._type
222
223     def __assert_type(self, val):
224         if not self._type:
225             raise ConfigLoadException(
226                     f"config: {self._name} must be typed", self)
227         
228         if self._type.check(val):
229            return
230
231         raise ConfigTypeCheckError(
232                 f"value: {val} does not match type {self._type}", self)
233
234     def set_value(self, val):
235         if self._rdonly:
236             return
237         
238         if isinstance(val, str):
239             val = self._type.parse_input(val)
240         
241         self.__assert_type(val)
242         self._value = val
243         self.__update_value()
244         self._env.dependency().cascade(self)
245
246     def set_default(self, val):
247         self.__assert_type(val)
248         self._default = val
249
250         if self._rdonly:
251             self._value = val
252
253     def get_value(self):
254         return self._value
255     
256     def evaluate(self):
257         super().evaluate()
258         self.__update_value()
259
260     def read_only(self):
261         return self._rdonly
262
263     def render(self, rctx):
264         if self.enabled():
265             rctx.add_field(self._display_name, self)
266
267     def serialise(self, dict):
268         s_val = self._type.serialise(self._value)
269         dict[self._name] = s_val
270
271     def deserialise(self, dict):
272         if self._name not in dict:
273             return
274         
275         s_val = dict[self._name]
276         v = self._type.deserialise(s_val)
277         self.__assert_type(v)
278         self._value = v
279
280
281     def __update_value(self):
282         v = self._value
283
284         if not self.enabled():
285             self._env.update_value(self._name, None)
286             return
287
288         if v is None:
289             v = self._default
290
291         if v is None and not self._type.allow_none():
292             raise ConfigLoadException(
293                     f"config: {self._name} must have a value", self)
294         
295         self._value = v
296         self._env.update_value(self._name, v)
297     
298
299
300
301 class LCGroupNode(LCFuncNode):
302     def __init__(self, fo, astn) -> None:
303         self._children = {}
304
305         super().__init__(fo, astn)
306
307     @staticmethod
308     def mapped_name():
309         return "Group"
310
311     def prune_astn(self, astn: ast.FunctionDef):
312         astn = super().prune_astn(astn)
313
314         other_exprs = []
315         for expr in astn.body:
316             if not isinstance(expr, ast.FunctionDef):
317                 other_exprs.append(expr)
318                 continue
319
320             node = LCFuncNode.construct(self._fo, expr)
321             node.set_parent(self)
322             self._children[node] = node
323
324         if not other_exprs:
325             other_exprs.append(
326                 ast.Pass(
327                     lineno=astn.lineno + 1, 
328                     col_offset=astn.col_offset
329                 )
330             )
331
332         astn.body = other_exprs
333         return astn
334
335     def evaluate(self):
336         old_enable = super().enabled()
337         super().evaluate()
338
339         new_enabled = super().enabled()
340         if not new_enabled and old_enable == new_enabled:
341             return
342
343         with self._env.eval_context(self) as evalc:
344             children = list(self._children.values())
345             for child in children:
346                 child.evaluate()
347         
348     def render(self, rctx):
349         for child in self._children.values():
350             child.render(rctx)
351
352     def serialise(self, dict):
353         sub_dict = {}
354         for child in self._children.values():
355             child.serialise(sub_dict)
356         
357         dict[self._name] = sub_dict
358
359     def deserialise(self, dict):
360         if self._name not in dict:
361             return
362         
363         sub_dict = dict[self._name]
364         for child in self._children.values():
365             child.deserialise(sub_dict)
366     
367     def add_child(self, node):
368         self._children[node] = node
369
370     def remove_child(self, node):
371         if node in self._children:
372             del self._children[node]
373
374
375 class LCCollectionNode(LCGroupNode):
376     def __init__(self, fo, astn) -> None:
377         super().__init__(fo, astn)
378
379     @staticmethod
380     def mapped_name():
381         return "Collection"
382     
383     def render(self, rctx):
384         _super = super()
385         rctx.add_expandable(
386             self._display_name,
387             self, 
388             lambda _ctx: 
389                 _super.render(_ctx)
390         )