4 from .lcnodes import LCModuleNode, LCTermNode
15 self.__file = env.to_wspath(file)
18 tree = ast.parse(f.read(), self.__file, mode='exec')
19 self.__module = LCModuleNode(self, tree)
27 def root_module(self):
30 def compile_astns(self, astns):
31 if not isinstance(astns, list):
35 ast.Module(body=astns, type_ignores=[]),
36 self.__file, mode='exec')
38 class DependencyGraph:
39 def __init__(self) -> None:
42 def add(self, dependent, dependee):
43 if dependent in self._edges:
44 if dependee in self._edges[dependent]:
46 self._edges[dependent].add(dependee)
48 self._edges[dependent] = set([dependee])
50 if self.__check_loop(dependee):
51 raise ConfigLoadException(
52 f"loop dependency detected: {dependent.get_name()}",
55 def __check_loop(self, start):
60 if current in visited:
64 if current not in self._edges:
66 for x in self._edges[current]:
71 def cascade(self, start):
75 if current in self._edges:
76 for x in self._edges[current]:
81 class ConfigTypeFactory:
82 def __init__(self) -> None:
85 def regitser(self, provider_type):
86 self.__providers.append(provider_type)
88 def create(self, typedef):
89 for provider in self.__providers:
90 if not provider.typedef_matched(typedef):
92 return provider(typedef)
94 raise ConfigLoadException(
95 f"no type provider defined for type: {typedef}", None)
97 class LConfigEvaluationWrapper:
98 def __init__(self, env, node) -> None:
103 self.__env.push_executing_node(self.__node)
106 def __exit__(self, type, value, tb):
107 self.__env.pop_executing_node()
110 return self.__env.evaluate_node(self.__node)
112 class LConfigEnvironment(Renderable):
113 def __init__(self, workspace, config_io) -> None:
116 self.__ws_path = path.abspath(workspace)
117 self.__exec_globl = globals()
118 self.__eval_stack = []
119 self.__lc_modules = []
120 self.__config_val = {}
121 self.__node_table = {}
122 self.__deps_graph = DependencyGraph()
123 self.__type_fatry = ConfigTypeFactory()
124 self.__config_io = config_io
126 def to_wspath(self, _path):
127 _path = path.abspath(_path)
128 return path.relpath(_path, self.__ws_path)
130 def register_builtin_func(self, func_obj):
131 call = (lambda *arg, **kwargs:
132 func_obj(self, *arg, **kwargs))
134 self.__exec_globl[func_obj.name] = call
136 def resolve_module(self, file):
137 fo = LConfigFile(self, file)
138 self.__lc_modules.insert(0, (fo.root_module()))
140 def evaluate_node(self, node):
141 name = node.get_name()
143 return self.get_symbol(name)()
145 def eval_context(self, node):
146 return LConfigEvaluationWrapper(self, node)
148 def push_executing_node(self, node):
149 self.__eval_stack.append(node)
151 def pop_executing_node(self):
152 return self.__eval_stack.pop()
154 def register_node(self, node):
155 self.__node_table[node.get_name()] = node
159 self.push_executing_node(node)
160 exec(node.get_co(), self.__exec_globl, l)
161 self.pop_executing_node()
162 except Exception as e:
163 raise ConfigLoadException("failed to load", node, e)
165 self.__exec_globl.update(l)
167 def lookup_node(self, name):
168 if name not in self.__node_table:
169 raise ConfigLoadException(f"node '{name}' undefined")
170 return self.__node_table[name]
172 def type_factory(self):
173 return self.__type_fatry
175 def update_value(self, key, value):
176 self.__config_val[key] = value
178 def get_symbol(self, name):
179 if name not in self.__exec_globl:
180 raise ConfigLoadException(f"unknown symbol '{name}'")
181 return self.__exec_globl[name]
183 def callframe_at(self, traverse_level):
185 return self.__eval_stack[traverse_level - 1]
189 def lookup_value(self, key):
190 return self.__config_val[key]
192 def resolve_symbol(self, sym):
193 term_node = self.__node_table[sym]
194 if isinstance(term_node, LCTermNode):
195 if not term_node.is_ready():
197 return term_node.get_value()
198 raise Exception(f"fail to resolve symbol: {sym}, not resolvable")
200 def dependency(self):
201 return self.__deps_graph
204 for mod in self.__lc_modules:
208 self.__config_io.export(self, self.__config_val)
210 def save(self, _path = ".config.json"):
212 for mod in self.__lc_modules:
215 with open(_path, 'w') as f:
218 def load(self, _path = ".config.json"):
219 if not path.exists(_path):
223 with open(_path, 'r') as f:
224 data.update(json.load(f))
226 for mod in self.__lc_modules:
227 mod.deserialise(data)
231 def render(self, rctx):
232 for mod in self.__lc_modules: