--- /dev/null
+from .api import (
+ ConfigLoadException,
+ ConfigTypeCheckError,
+ Renderable
+)
+
+from .utils import (
+ extract_decorators,
+ to_displayable
+)
+
+import ast, textwrap
+from abc import abstractmethod
+
+
+
+class LCNode(Renderable):
+ def __init__(self, fo, astn):
+ super().__init__()
+
+ self._fo = fo
+ self._env = fo.env()
+ self._astn = self.prune_astn(astn)
+ self._co = self.compile_astn()
+ self._parent = None
+
+ self._env.register_node(self)
+
+ def prune_astn(self, astn):
+ return astn
+
+ def compile_astn(self):
+ return self._fo.compile_astns(self._astn)
+
+ def get_co(self):
+ return self._co
+
+ def get_fo(self):
+ return self._fo
+
+ def set_parent(self, new_parent):
+ self._parent = new_parent
+
+ def parent(self):
+ return self._parent
+
+ def add_child(self, node):
+ pass
+
+ def remove_child(self, node):
+ pass
+
+ def get_name(self):
+ return f"LCNode: {self.__hash__()}"
+
+ @abstractmethod
+ def evaluate(self):
+ pass
+
+ def render(self, rctx):
+ pass
+
+ def deserialise(self, dict):
+ pass
+
+ def serialise(self, dict):
+ pass
+
+
+
+class LCModuleNode(LCNode):
+ def __init__(self, fo, astn: ast.Module):
+ self.__nodes = {}
+
+ super().__init__(fo, astn)
+
+ def get_name(self):
+ return f"file: {self._fo.filename()}"
+
+ def prune_astn(self, astn: ast.Module):
+ general_exprs = []
+
+ for b in astn.body:
+ if not isinstance(b, ast.FunctionDef):
+ general_exprs.append(b)
+ continue
+
+ node = LCFuncNode.construct(self._fo, b)
+ node.set_parent(self)
+
+ self.add_child(node)
+
+ return general_exprs
+
+ def evaluate(self):
+ with self._env.eval_context(self) as evalc:
+ ls = list(self.__nodes.values())
+ for node in ls:
+ node.evaluate()
+
+ def add_child(self, node):
+ self.__nodes[node] = node
+
+ def remove_child(self, node):
+ if node in self.__nodes:
+ del self.__nodes[node]
+
+ def deserialise(self, dict):
+ for node in self.__nodes:
+ node.deserialise(dict)
+
+ def serialise(self, dict):
+ for node in self.__nodes:
+ node.serialise(dict)
+
+ def render(self, rctx):
+ for node in self.__nodes:
+ node.render(rctx)
+
+class LCFuncNode(LCNode):
+ def __init__(self, fo, astn) -> None:
+ self._decors = {}
+ self._name = None
+ self._help = ""
+ self._display_name = None
+ self._enabled = True
+
+ super().__init__(fo, astn)
+
+ def prune_astn(self, astn: ast.FunctionDef):
+ self._name = astn.name
+ self._display_name = to_displayable(self._name)
+
+ maybe_doc = astn.body[0]
+ if isinstance(maybe_doc, ast.Expr):
+ if isinstance(maybe_doc.value, ast.Constant):
+ self._help = maybe_doc.value.value
+ self._help = textwrap.dedent(self._help)
+
+ decors = extract_decorators(astn)
+ for name, args, kwargs in decors:
+ self._decors[name] = (args, kwargs)
+
+ (args, _) = self._decors[self.mapped_name()]
+ if args:
+ self._display_name = args[0]
+
+ astn.decorator_list.clear()
+ return astn
+
+ def get_name(self):
+ return self._name
+
+ def get_display_name(self):
+ return self._display_name
+
+ def enabled(self):
+ if isinstance(self._parent, LCFuncNode):
+ return self._enabled and self._parent.enabled()
+ return self._enabled
+
+ def help_prompt(self):
+ return self._help
+
+ def evaluate(self):
+ with self._env.eval_context(self) as evalc:
+ result = evalc.evaluate()
+ self._enabled = True if result is None else result
+
+ @staticmethod
+ def mapped_name(self):
+ return None
+
+ @staticmethod
+ def construct(fo, astn: ast.FunctionDef):
+ nodes = [
+ LCCollectionNode,
+ LCGroupNode,
+ LCTermNode
+ ]
+
+ for node in nodes:
+ if extract_decorators(astn, node.mapped_name(), True):
+ return node(fo, astn)
+
+ raise ConfigLoadException(
+ f"unknown type for astn type: {type(astn)}")
+
+ def set_parent(self, new_parent):
+ if self._parent:
+ self._parent.remove_child(self)
+
+ new_parent.add_child(self)
+ super().set_parent(new_parent)
+
+
+class LCTermNode(LCFuncNode):
+ def __init__(self, fo, astn) -> None:
+ self._value = None
+ self._default = None
+ self._type = None
+ self._rdonly = False
+
+ super().__init__(fo, astn)
+
+ @staticmethod
+ def mapped_name():
+ return "Term"
+
+ def prune_astn(self, astn: ast.FunctionDef):
+ astn = super().prune_astn(astn)
+
+ self._rdonly = "ReadOnly" in self._decors
+
+ return astn
+
+ def set_type(self, type_def):
+ self._type = self._env.type_factory().create(type_def)
+
+ def get_type(self):
+ return self._type
+
+ def __assert_type(self, val):
+ if not self._type:
+ raise ConfigLoadException(
+ f"config: {self._name} must be typed", self)
+
+ if self._type.check(val):
+ return
+
+ raise ConfigTypeCheckError(
+ f"value: {val} does not match type {self._type}", self)
+
+ def set_value(self, val):
+ if self._rdonly:
+ return
+
+ if isinstance(val, str):
+ val = self._type.parse_input(val)
+
+ self.__assert_type(val)
+ self._value = val
+ self.__update_value()
+ self._env.dependency().cascade(self)
+
+ def set_default(self, val):
+ self.__assert_type(val)
+ self._default = val
+
+ if self._rdonly:
+ self._value = val
+
+ def get_value(self):
+ return self._value
+
+ def evaluate(self):
+ super().evaluate()
+ self.__update_value()
+
+ def read_only(self):
+ return self._rdonly
+
+ def render(self, rctx):
+ if self.enabled():
+ rctx.add_field(self._display_name, self)
+
+ def serialise(self, dict):
+ s_val = self._type.serialise(self._value)
+ dict[self._name] = s_val
+
+ def deserialise(self, dict):
+ if self._name not in dict:
+ return
+
+ s_val = dict[self._name]
+ v = self._type.deserialise(s_val)
+ self.__assert_type(v)
+ self._value = v
+
+
+ def __update_value(self):
+ v = self._value
+
+ if not self.enabled():
+ self._env.update_value(self._name, None)
+ return
+
+ if v is None:
+ v = self._default
+
+ if v is None and not self._type.allow_none():
+ raise ConfigLoadException(
+ f"config: {self._name} must have a value", self)
+
+ self._value = v
+ self._env.update_value(self._name, v)
+
+
+
+
+class LCGroupNode(LCFuncNode):
+ def __init__(self, fo, astn) -> None:
+ self._children = {}
+
+ super().__init__(fo, astn)
+
+ @staticmethod
+ def mapped_name():
+ return "Group"
+
+ def prune_astn(self, astn: ast.FunctionDef):
+ astn = super().prune_astn(astn)
+
+ other_exprs = []
+ for expr in astn.body:
+ if not isinstance(expr, ast.FunctionDef):
+ other_exprs.append(expr)
+ continue
+
+ node = LCFuncNode.construct(self._fo, expr)
+ node.set_parent(self)
+ self._children[node] = node
+
+ if not other_exprs:
+ other_exprs.append(
+ ast.Pass(
+ lineno=astn.lineno + 1,
+ col_offset=astn.col_offset
+ )
+ )
+
+ astn.body = other_exprs
+ return astn
+
+ def evaluate(self):
+ old_enable = super().enabled()
+ super().evaluate()
+
+ new_enabled = super().enabled()
+ if not new_enabled and old_enable == new_enabled:
+ return
+
+ with self._env.eval_context(self) as evalc:
+ children = list(self._children.values())
+ for child in children:
+ child.evaluate()
+
+ def render(self, rctx):
+ for child in self._children.values():
+ child.render(rctx)
+
+ def serialise(self, dict):
+ sub_dict = {}
+ for child in self._children.values():
+ child.serialise(sub_dict)
+
+ dict[self._name] = sub_dict
+
+ def deserialise(self, dict):
+ if self._name not in dict:
+ return
+
+ sub_dict = dict[self._name]
+ for child in self._children.values():
+ child.deserialise(sub_dict)
+
+ def add_child(self, node):
+ self._children[node] = node
+
+ def remove_child(self, node):
+ if node in self._children:
+ del self._children[node]
+
+
+class LCCollectionNode(LCGroupNode):
+ def __init__(self, fo, astn) -> None:
+ super().__init__(fo, astn)
+
+ @staticmethod
+ def mapped_name():
+ return "Collection"
+
+ def render(self, rctx):
+ _super = super()
+ rctx.add_expandable(
+ self._display_name,
+ self,
+ lambda _ctx:
+ _super.render(_ctx)
+ )
\ No newline at end of file