-import os.path as path
-import ast, json
-
-from .lcnodes import LCModuleNode, LCTermNode
-from .api import (
- ConfigLoadException,
- Renderable
-)
-
-class LConfigFile:
- def __init__(self,
- env,
- file) -> None:
- self.__env = env
- self.__file = env.to_wspath(file)
-
- with open(file) as f:
- tree = ast.parse(f.read(), self.__file, mode='exec')
- self.__module = LCModuleNode(self, tree)
-
- def filename(self):
- return self.__file
-
- def env(self):
- return self.__env
-
- def root_module(self):
- return self.__module
-
- def compile_astns(self, astns):
- if not isinstance(astns, list):
- astns = [astns]
-
- return compile(
- ast.Module(body=astns, type_ignores=[]),
- self.__file, mode='exec')
-
-class DependencyGraph:
- def __init__(self) -> None:
- self._edges = {}
-
- def add(self, dependent, dependee):
- if dependent in self._edges:
- if dependee in self._edges[dependent]:
- return
- self._edges[dependent].add(dependee)
- else:
- self._edges[dependent] = set([dependee])
-
- if self.__check_loop(dependee):
- raise ConfigLoadException(
- f"loop dependency detected: {dependent.get_name()}",
- dependee)
-
- def __check_loop(self, start):
- visited = set()
- q = [start]
- while len(q) > 0:
- current = q.pop()
- if current in visited:
- return True
-
- visited.add(current)
- if current not in self._edges:
- continue
- for x in self._edges[current]:
- q.append(x)
-
- return False
-
- def cascade(self, start):
- q = [start]
- while len(q) > 0:
- current = q.pop()
- if current in self._edges:
- for x in self._edges[current]:
- q.append(x)
- if current != start:
- current.evaluate()
-
-class ConfigTypeFactory:
- def __init__(self) -> None:
- self.__providers = []
-
- def regitser(self, provider_type):
- self.__providers.append(provider_type)
-
- def create(self, typedef):
- for provider in self.__providers:
- if not provider.typedef_matched(typedef):
- continue
- return provider(typedef)
-
- raise ConfigLoadException(
- f"no type provider defined for type: {typedef}", None)
-
-class LConfigEvaluationWrapper:
- def __init__(self, env, node) -> None:
- self.__env = env
- self.__node = node
-
- def __enter__(self):
- self.__env.push_executing_node(self.__node)
- return self
-
- def __exit__(self, type, value, tb):
- self.__env.pop_executing_node()
-
- def evaluate(self):
- return self.__env.evaluate_node(self.__node)
-
-class LConfigEnvironment(Renderable):
- def __init__(self, workspace, config_io) -> None:
- super().__init__()
-
- self.__ws_path = path.abspath(workspace)
- self.__exec_globl = {}
- self.__eval_stack = []
- self.__lc_modules = []
- self.__config_val = {}
- self.__node_table = {}
- self.__deps_graph = DependencyGraph()
- self.__type_fatry = ConfigTypeFactory()
- self.__config_io = config_io
-
- def to_wspath(self, _path):
- _path = path.abspath(_path)
- return path.relpath(_path, self.__ws_path)
-
- def register_builtin_func(self, func_obj):
- call = (lambda *arg, **kwargs:
- func_obj(self, *arg, **kwargs))
-
- self.__exec_globl[func_obj.name] = call
-
- def resolve_module(self, file):
- fo = LConfigFile(self, file)
- self.__lc_modules.insert(0, (fo.root_module()))
-
- def evaluate_node(self, node):
- name = node.get_name()
-
- return self.get_symbol(name)()
-
- def eval_context(self, node):
- return LConfigEvaluationWrapper(self, node)
-
- def push_executing_node(self, node):
- self.__eval_stack.append(node)
-
- def pop_executing_node(self):
- return self.__eval_stack.pop()
-
- def register_node(self, node):
- self.__node_table[node.get_name()] = node
-
- l = {}
- try:
- self.push_executing_node(node)
- exec(node.get_co(), self.__exec_globl, l)
- self.pop_executing_node()
- except Exception as e:
- raise ConfigLoadException("failed to load", node, e)
-
- self.__exec_globl.update(l)
-
- def lookup_node(self, name):
- if name not in self.__node_table:
- raise ConfigLoadException(f"node '{name}' undefined")
- return self.__node_table[name]
-
- def type_factory(self):
- return self.__type_fatry
-
- def update_value(self, key, value):
- self.__config_val[key] = value
-
- def get_symbol(self, name):
- if name not in self.__exec_globl:
- raise ConfigLoadException(f"unknown symbol '{name}'")
- return self.__exec_globl[name]
-
- def callframe_at(self, traverse_level):
- try:
- return self.__eval_stack[traverse_level - 1]
- except:
- return None
-
- def lookup_value(self, key):
- return self.__config_val[key]
-
- def resolve_symbol(self, sym):
- term_node = self.__node_table[sym]
- if isinstance(term_node, LCTermNode):
- if not term_node.is_ready():
- term_node.evaluate()
- return term_node.get_value()
- raise Exception(f"fail to resolve symbol: {sym}, not resolvable")
-
- def dependency(self):
- return self.__deps_graph
-
- def update(self):
- for mod in self.__lc_modules:
- mod.evaluate()
-
- def export(self):
- self.__config_io.export(self, self.__config_val)
-
- def save(self, _path = ".config.json"):
- data = {}
- for mod in self.__lc_modules:
- mod.serialise(data)
-
- with open(_path, 'w') as f:
- json.dump(data, f)
-
- def load(self, _path = ".config.json"):
- if not path.exists(_path):
- return
-
- data = {}
- with open(_path, 'r') as f:
- data.update(json.load(f))
-
- for mod in self.__lc_modules:
- mod.deserialise(data)
-
- self.update()
-
- def render(self, rctx):
- for mod in self.__lc_modules:
- mod.render(rctx)
\ No newline at end of file