X-Git-Url: https://scm.lunaixsky.com/lunaix-os.git/blobdiff_plain/d1b1c8d9119229dbeed06cd252917e54a1cb77f6..1025235c72c31f7fa7b648c0e32ddcaa68a8f66a:/lunaix-os/scripts/build-tools/lcfg/lcnodes.py diff --git a/lunaix-os/scripts/build-tools/lcfg/lcnodes.py b/lunaix-os/scripts/build-tools/lcfg/lcnodes.py new file mode 100644 index 0000000..cf869b0 --- /dev/null +++ b/lunaix-os/scripts/build-tools/lcfg/lcnodes.py @@ -0,0 +1,390 @@ +from .api import ( + ConfigLoadException, + ConfigTypeCheckError, + Renderable +) + +from .utils import ( + extract_decorators, + to_displayable +) + +import ast, textwrap +from abc import abstractmethod + + + +class LCNode(Renderable): + def __init__(self, fo, astn): + super().__init__() + + self._fo = fo + self._env = fo.env() + self._astn = self.prune_astn(astn) + self._co = self.compile_astn() + self._parent = None + + self._env.register_node(self) + + def prune_astn(self, astn): + return astn + + def compile_astn(self): + return self._fo.compile_astns(self._astn) + + def get_co(self): + return self._co + + def get_fo(self): + return self._fo + + def set_parent(self, new_parent): + self._parent = new_parent + + def parent(self): + return self._parent + + def add_child(self, node): + pass + + def remove_child(self, node): + pass + + def get_name(self): + return f"LCNode: {self.__hash__()}" + + @abstractmethod + def evaluate(self): + pass + + def render(self, rctx): + pass + + def deserialise(self, dict): + pass + + def serialise(self, dict): + pass + + + +class LCModuleNode(LCNode): + def __init__(self, fo, astn: ast.Module): + self.__nodes = {} + + super().__init__(fo, astn) + + def get_name(self): + return f"file: {self._fo.filename()}" + + def prune_astn(self, astn: ast.Module): + general_exprs = [] + + for b in astn.body: + if not isinstance(b, ast.FunctionDef): + general_exprs.append(b) + continue + + node = LCFuncNode.construct(self._fo, b) + node.set_parent(self) + + self.add_child(node) + + return general_exprs + + def evaluate(self): + with self._env.eval_context(self) as evalc: + ls = list(self.__nodes.values()) + for node in ls: + node.evaluate() + + def add_child(self, node): + self.__nodes[node] = node + + def remove_child(self, node): + if node in self.__nodes: + del self.__nodes[node] + + def deserialise(self, dict): + for node in self.__nodes: + node.deserialise(dict) + + def serialise(self, dict): + for node in self.__nodes: + node.serialise(dict) + + def render(self, rctx): + for node in self.__nodes: + node.render(rctx) + +class LCFuncNode(LCNode): + def __init__(self, fo, astn) -> None: + self._decors = {} + self._name = None + self._help = "" + self._display_name = None + self._enabled = True + + super().__init__(fo, astn) + + def prune_astn(self, astn: ast.FunctionDef): + self._name = astn.name + self._display_name = to_displayable(self._name) + + maybe_doc = astn.body[0] + if isinstance(maybe_doc, ast.Expr): + if isinstance(maybe_doc.value, ast.Constant): + self._help = maybe_doc.value.value + self._help = textwrap.dedent(self._help) + + decors = extract_decorators(astn) + for name, args, kwargs in decors: + self._decors[name] = (args, kwargs) + + (args, _) = self._decors[self.mapped_name()] + if args: + self._display_name = args[0] + + astn.decorator_list.clear() + return astn + + def get_name(self): + return self._name + + def get_display_name(self): + return self._display_name + + def enabled(self): + if isinstance(self._parent, LCFuncNode): + return self._enabled and self._parent.enabled() + return self._enabled + + def help_prompt(self): + return self._help + + def evaluate(self): + with self._env.eval_context(self) as evalc: + result = evalc.evaluate() + self._enabled = True if result is None else result + + @staticmethod + def mapped_name(self): + return None + + @staticmethod + def construct(fo, astn: ast.FunctionDef): + nodes = [ + LCCollectionNode, + LCGroupNode, + LCTermNode + ] + + for node in nodes: + if extract_decorators(astn, node.mapped_name(), True): + return node(fo, astn) + + raise ConfigLoadException( + f"unknown type for astn type: {type(astn)}") + + def set_parent(self, new_parent): + if self._parent: + self._parent.remove_child(self) + + new_parent.add_child(self) + super().set_parent(new_parent) + + +class LCTermNode(LCFuncNode): + def __init__(self, fo, astn) -> None: + self._value = None + self._default = None + self._type = None + self._rdonly = False + + super().__init__(fo, astn) + + @staticmethod + def mapped_name(): + return "Term" + + def prune_astn(self, astn: ast.FunctionDef): + astn = super().prune_astn(astn) + + self._rdonly = "ReadOnly" in self._decors + + return astn + + def set_type(self, type_def): + self._type = self._env.type_factory().create(type_def) + + def get_type(self): + return self._type + + def __assert_type(self, val): + if not self._type: + raise ConfigLoadException( + f"config: {self._name} must be typed", self) + + if self._type.check(val): + return + + raise ConfigTypeCheckError( + f"value: {val} does not match type {self._type}", self) + + def set_value(self, val): + if self._rdonly: + return + + if isinstance(val, str): + val = self._type.parse_input(val) + + self.__assert_type(val) + self._value = val + self.__update_value() + self._env.dependency().cascade(self) + + def set_default(self, val): + self.__assert_type(val) + self._default = val + + if self._rdonly: + self._value = val + + def get_value(self): + return self._value + + def evaluate(self): + super().evaluate() + self.__update_value() + + def read_only(self): + return self._rdonly + + def render(self, rctx): + if self.enabled(): + rctx.add_field(self._display_name, self) + + def serialise(self, dict): + s_val = self._type.serialise(self._value) + dict[self._name] = s_val + + def deserialise(self, dict): + if self._name not in dict: + return + + s_val = dict[self._name] + v = self._type.deserialise(s_val) + self.__assert_type(v) + self._value = v + + + def __update_value(self): + v = self._value + + if not self.enabled(): + self._env.update_value(self._name, None) + return + + if v is None: + v = self._default + + if v is None and not self._type.allow_none(): + raise ConfigLoadException( + f"config: {self._name} must have a value", self) + + self._value = v + self._env.update_value(self._name, v) + + + + +class LCGroupNode(LCFuncNode): + def __init__(self, fo, astn) -> None: + self._children = {} + + super().__init__(fo, astn) + + @staticmethod + def mapped_name(): + return "Group" + + def prune_astn(self, astn: ast.FunctionDef): + astn = super().prune_astn(astn) + + other_exprs = [] + for expr in astn.body: + if not isinstance(expr, ast.FunctionDef): + other_exprs.append(expr) + continue + + node = LCFuncNode.construct(self._fo, expr) + node.set_parent(self) + self._children[node] = node + + if not other_exprs: + other_exprs.append( + ast.Pass( + lineno=astn.lineno + 1, + col_offset=astn.col_offset + ) + ) + + astn.body = other_exprs + return astn + + def evaluate(self): + old_enable = super().enabled() + super().evaluate() + + new_enabled = super().enabled() + if not new_enabled and old_enable == new_enabled: + return + + with self._env.eval_context(self) as evalc: + children = list(self._children.values()) + for child in children: + child.evaluate() + + def render(self, rctx): + for child in self._children.values(): + child.render(rctx) + + def serialise(self, dict): + sub_dict = {} + for child in self._children.values(): + child.serialise(sub_dict) + + dict[self._name] = sub_dict + + def deserialise(self, dict): + if self._name not in dict: + return + + sub_dict = dict[self._name] + for child in self._children.values(): + child.deserialise(sub_dict) + + def add_child(self, node): + self._children[node] = node + + def remove_child(self, node): + if node in self._children: + del self._children[node] + + +class LCCollectionNode(LCGroupNode): + def __init__(self, fo, astn) -> None: + super().__init__(fo, astn) + + @staticmethod + def mapped_name(): + return "Collection" + + def render(self, rctx): + _super = super() + rctx.add_expandable( + self._display_name, + self, + lambda _ctx: + _super.render(_ctx) + ) \ No newline at end of file