fix: gen_ksymtable does not work on non-English platform
[lunaix-os.git] / lunaix-os / scripts / build-tools / lcfg / common.py
1 import os.path as path
2 import ast, json
3
4 from .lcnodes import LCModuleNode, LCTermNode
5 from .api import (
6     ConfigLoadException, 
7     Renderable
8 )
9
10 class LConfigFile:
11     def __init__(self, 
12                  env, 
13                  file) -> None:
14         self.__env = env
15         self.__file = env.to_wspath(file)
16
17         with open(file) as f:
18             tree = ast.parse(f.read(), self.__file, mode='exec')
19             self.__module = LCModuleNode(self, tree)
20
21     def filename(self):
22         return self.__file
23     
24     def env(self):
25         return self.__env
26     
27     def root_module(self):
28         return self.__module
29     
30     def compile_astns(self, astns):
31         if not isinstance(astns, list):
32             astns = [astns]
33
34         return compile(
35             ast.Module(body=astns, type_ignores=[]), 
36             self.__file, mode='exec')
37     
38 class DependencyGraph:
39     def __init__(self) -> None:
40         self._edges = {}
41
42     def add(self, dependent, dependee):
43         if dependent in self._edges:
44             if dependee in self._edges[dependent]:
45                 return
46             self._edges[dependent].add(dependee)
47         else:
48             self._edges[dependent] = set([dependee])
49         
50         if self.__check_loop(dependee):
51             raise ConfigLoadException(
52                 f"loop dependency detected: {dependent.get_name()}",
53                 dependee)
54
55     def __check_loop(self, start):
56         visited = set()
57         q = [start]
58         while len(q) > 0:
59             current = q.pop()
60             if current in visited:
61                 return True
62             
63             visited.add(current)
64             if current not in self._edges:
65                 continue
66             for x in self._edges[current]:
67                 q.append(x)
68         
69         return False
70     
71     def cascade(self, start):
72         q = [start]
73         while len(q) > 0:
74             current = q.pop()
75             if current in self._edges:
76                 for x in self._edges[current]:
77                     q.append(x)
78             if current != start:
79                 current.evaluate()
80
81 class ConfigTypeFactory:
82     def __init__(self) -> None:
83         self.__providers = []
84
85     def regitser(self, provider_type):
86         self.__providers.append(provider_type)
87
88     def create(self, typedef):
89         for provider in self.__providers:
90             if not provider.typedef_matched(typedef):
91                 continue
92             return provider(typedef)
93         
94         raise ConfigLoadException(
95                 f"no type provider defined for type: {typedef}", None)
96
97 class LConfigEvaluationWrapper:
98     def __init__(self, env, node) -> None:
99         self.__env = env
100         self.__node = node
101
102     def __enter__(self):
103         self.__env.push_executing_node(self.__node)
104         return self
105
106     def __exit__(self, type, value, tb):
107         self.__env.pop_executing_node()
108
109     def evaluate(self):
110         return self.__env.evaluate_node(self.__node)
111
112 class LConfigEnvironment(Renderable):
113     def __init__(self, workspace, config_io) -> None:
114         super().__init__()
115         
116         self.__ws_path = path.abspath(workspace)
117         self.__exec_globl = {}
118         self.__eval_stack = []
119         self.__lc_modules = []
120         self.__config_val = {}
121         self.__node_table = {}
122         self.__deps_graph = DependencyGraph()
123         self.__type_fatry = ConfigTypeFactory()
124         self.__config_io  = config_io
125
126     def to_wspath(self, _path):
127         _path = path.abspath(_path)
128         return path.relpath(_path, self.__ws_path)
129
130     def register_builtin_func(self, func_obj):
131         call = (lambda *arg, **kwargs:
132                      func_obj(self, *arg, **kwargs))
133         
134         self.__exec_globl[func_obj.name] = call
135
136     def resolve_module(self, file):
137         fo = LConfigFile(self, file)
138         self.__lc_modules.insert(0, (fo.root_module()))
139
140     def evaluate_node(self, node):
141         name = node.get_name()
142         
143         return self.get_symbol(name)()
144     
145     def eval_context(self, node):
146         return LConfigEvaluationWrapper(self, node)
147     
148     def push_executing_node(self, node):
149         self.__eval_stack.append(node)
150
151     def pop_executing_node(self):
152         return self.__eval_stack.pop()
153
154     def register_node(self, node):
155         self.__node_table[node.get_name()] = node
156
157         l = {}
158         try:
159             self.push_executing_node(node)
160             exec(node.get_co(), self.__exec_globl, l)
161             self.pop_executing_node()
162         except Exception as e:
163             raise ConfigLoadException("failed to load", node, e)
164         
165         self.__exec_globl.update(l)
166
167     def lookup_node(self, name):
168         if name not in self.__node_table:
169             raise ConfigLoadException(f"node '{name}' undefined")
170         return self.__node_table[name]
171
172     def type_factory(self):
173         return self.__type_fatry
174     
175     def update_value(self, key, value):
176         self.__config_val[key] = value
177
178     def get_symbol(self, name):
179         if name not in self.__exec_globl:
180             raise ConfigLoadException(f"unknown symbol '{name}'")
181         return self.__exec_globl[name]
182
183     def callframe_at(self, traverse_level):
184         try:
185             return self.__eval_stack[traverse_level - 1]
186         except:
187             return None
188     
189     def lookup_value(self, key):
190         return self.__config_val[key]
191     
192     def resolve_symbol(self, sym):
193         term_node = self.__node_table[sym]
194         if isinstance(term_node, LCTermNode):
195             if not term_node.is_ready():
196                 term_node.evaluate()
197             return term_node.get_value()
198         raise Exception(f"fail to resolve symbol: {sym}, not resolvable")
199         
200     def dependency(self):
201         return self.__deps_graph
202     
203     def update(self):
204         for mod in self.__lc_modules:
205             mod.evaluate()
206     
207     def export(self):
208         self.__config_io.export(self, self.__config_val)
209     
210     def save(self, _path = ".config.json"):
211         data = {}
212         for mod in self.__lc_modules:
213             mod.serialise(data)
214
215         with open(_path, 'w') as f:
216             json.dump(data, f)
217
218     def load(self, _path = ".config.json"):
219         if not path.exists(_path):
220             return
221         
222         data = {}
223         with open(_path, 'r') as f:
224             data.update(json.load(f))
225         
226         for mod in self.__lc_modules:
227             mod.deserialise(data)
228
229         self.update()
230
231     def render(self, rctx):
232         for mod in self.__lc_modules:
233             mod.render(rctx)