X-Git-Url: https://scm.lunaixsky.com/lunaix-os.git/blobdiff_plain/bcc25888b3299758ad36721530cca3b899b7166c..c043fa535514a76091be87a45ad472a505f9dd33:/lunaix-os/scripts/build-tools/lcfg/common.py diff --git a/lunaix-os/scripts/build-tools/lcfg/common.py b/lunaix-os/scripts/build-tools/lcfg/common.py deleted file mode 100644 index 383d1ec..0000000 --- a/lunaix-os/scripts/build-tools/lcfg/common.py +++ /dev/null @@ -1,233 +0,0 @@ -import os.path as path -import ast, json - -from .lcnodes import LCModuleNode, LCTermNode -from .api import ( - ConfigLoadException, - Renderable -) - -class LConfigFile: - def __init__(self, - env, - file) -> None: - self.__env = env - self.__file = env.to_wspath(file) - - with open(file) as f: - tree = ast.parse(f.read(), self.__file, mode='exec') - self.__module = LCModuleNode(self, tree) - - def filename(self): - return self.__file - - def env(self): - return self.__env - - def root_module(self): - return self.__module - - def compile_astns(self, astns): - if not isinstance(astns, list): - astns = [astns] - - return compile( - ast.Module(body=astns, type_ignores=[]), - self.__file, mode='exec') - -class DependencyGraph: - def __init__(self) -> None: - self._edges = {} - - def add(self, dependent, dependee): - if dependent in self._edges: - if dependee in self._edges[dependent]: - return - self._edges[dependent].add(dependee) - else: - self._edges[dependent] = set([dependee]) - - if self.__check_loop(dependee): - raise ConfigLoadException( - f"loop dependency detected: {dependent.get_name()}", - dependee) - - def __check_loop(self, start): - visited = set() - q = [start] - while len(q) > 0: - current = q.pop() - if current in visited: - return True - - visited.add(current) - if current not in self._edges: - continue - for x in self._edges[current]: - q.append(x) - - return False - - def cascade(self, start): - q = [start] - while len(q) > 0: - current = q.pop() - if current in self._edges: - for x in self._edges[current]: - q.append(x) - if current != start: - current.evaluate() - -class ConfigTypeFactory: - def __init__(self) -> None: - self.__providers = [] - - def regitser(self, provider_type): - self.__providers.append(provider_type) - - def create(self, typedef): - for provider in self.__providers: - if not provider.typedef_matched(typedef): - continue - return provider(typedef) - - raise ConfigLoadException( - f"no type provider defined for type: {typedef}", None) - -class LConfigEvaluationWrapper: - def __init__(self, env, node) -> None: - self.__env = env - self.__node = node - - def __enter__(self): - self.__env.push_executing_node(self.__node) - return self - - def __exit__(self, type, value, tb): - self.__env.pop_executing_node() - - def evaluate(self): - return self.__env.evaluate_node(self.__node) - -class LConfigEnvironment(Renderable): - def __init__(self, workspace, config_io) -> None: - super().__init__() - - self.__ws_path = path.abspath(workspace) - self.__exec_globl = {} - self.__eval_stack = [] - self.__lc_modules = [] - self.__config_val = {} - self.__node_table = {} - self.__deps_graph = DependencyGraph() - self.__type_fatry = ConfigTypeFactory() - self.__config_io = config_io - - def to_wspath(self, _path): - _path = path.abspath(_path) - return path.relpath(_path, self.__ws_path) - - def register_builtin_func(self, func_obj): - call = (lambda *arg, **kwargs: - func_obj(self, *arg, **kwargs)) - - self.__exec_globl[func_obj.name] = call - - def resolve_module(self, file): - fo = LConfigFile(self, file) - self.__lc_modules.insert(0, (fo.root_module())) - - def evaluate_node(self, node): - name = node.get_name() - - return self.get_symbol(name)() - - def eval_context(self, node): - return LConfigEvaluationWrapper(self, node) - - def push_executing_node(self, node): - self.__eval_stack.append(node) - - def pop_executing_node(self): - return self.__eval_stack.pop() - - def register_node(self, node): - self.__node_table[node.get_name()] = node - - l = {} - try: - self.push_executing_node(node) - exec(node.get_co(), self.__exec_globl, l) - self.pop_executing_node() - except Exception as e: - raise ConfigLoadException("failed to load", node, e) - - self.__exec_globl.update(l) - - def lookup_node(self, name): - if name not in self.__node_table: - raise ConfigLoadException(f"node '{name}' undefined") - return self.__node_table[name] - - def type_factory(self): - return self.__type_fatry - - def update_value(self, key, value): - self.__config_val[key] = value - - def get_symbol(self, name): - if name not in self.__exec_globl: - raise ConfigLoadException(f"unknown symbol '{name}'") - return self.__exec_globl[name] - - def callframe_at(self, traverse_level): - try: - return self.__eval_stack[traverse_level - 1] - except: - return None - - def lookup_value(self, key): - return self.__config_val[key] - - def resolve_symbol(self, sym): - term_node = self.__node_table[sym] - if isinstance(term_node, LCTermNode): - if not term_node.is_ready(): - term_node.evaluate() - return term_node.get_value() - raise Exception(f"fail to resolve symbol: {sym}, not resolvable") - - def dependency(self): - return self.__deps_graph - - def update(self): - for mod in self.__lc_modules: - mod.evaluate() - - def export(self): - self.__config_io.export(self, self.__config_val) - - def save(self, _path = ".config.json"): - data = {} - for mod in self.__lc_modules: - mod.serialise(data) - - with open(_path, 'w') as f: - json.dump(data, f) - - def load(self, _path = ".config.json"): - if not path.exists(_path): - return - - data = {} - with open(_path, 'r') as f: - data.update(json.load(f)) - - for mod in self.__lc_modules: - mod.deserialise(data) - - self.update() - - def render(self, rctx): - for mod in self.__lc_modules: - mod.render(rctx) \ No newline at end of file