-from lcfg.api import (
- RenderContext,
- InteractiveRenderer,
- Renderable,
- ConfigTypeCheckError,
- ConfigLoadException
-)
-from lcfg.common import LConfigEnvironment
-
-import shlex
-from lib.utils import join_path
-from pathlib import Path
-import readline
-
-class ShellException(Exception):
- def __init__(self, *args: object) -> None:
- super().__init__(*args)
-
-class ViewElement:
- def __init__(self, label, node) -> None:
- self.label = label
- self.node = node
-
- def expand(self, sctx):
- return False
-
- def read(self, sctx):
- return None
-
- def write(self, sctx, val):
- return None
-
- def get_help(self, sctx):
- return self.node.help_prompt()
-
- def get_type(self, sctx):
- return "N/A"
-
- def draw(self, sctx, canvas):
- pass
-
-
-class SubviewLink(ViewElement):
- def __init__(self, label, node, cb) -> None:
- super().__init__(label, node)
- self.__callback = cb
-
- def expand(self, sctx):
- sctx.clear_view()
- self.__callback(sctx)
- return True
-
- def draw(self, sctx, canvas):
- print(f" [{self.label}]")
-
- def get_type(self, sctx):
- return f"Collection: {self.label}"
-
-class RootWrapper(ViewElement):
- def __init__(self, root) -> None:
- self.__root = root
- super().__init__("root", None)
-
- def expand(self, sctx):
- sctx.clear_view()
- self.__root.render(sctx)
- return True
-
- def draw(self, sctx, canvas):
- pass
-
- def get_type(self, sctx):
- return ""
-
-class FieldView(ViewElement):
- def __init__(self, label, node) -> None:
- super().__init__(label, node)
-
- def read(self, sctx):
- return self.node.get_value()
-
- def write(self, sctx, val):
- if self.node.read_only():
- return None
-
- self.node.set_value(val)
- return val
-
- def get_type(self, sctx):
- return f"Config term: {self.label}\n{self.node.get_type()}"
-
- def draw(self, sctx, canvas):
- suffix = ""
- if self.node.read_only():
- suffix = " (ro)"
- print(f" {self.label}{suffix}")
-
-class ShellContext(RenderContext):
- def __init__(self) -> None:
- super().__init__()
-
- self._view = {}
- self._subviews = []
- self._field = []
-
- def add_expandable(self, label, node, on_expand_cb):
- name = node.get_name()
- self._view[name] = SubviewLink(name, node, on_expand_cb)
- self._subviews.append(name)
-
- def add_field(self, label, node):
- name = node.get_name()
- self._view[name] = FieldView(name, node)
- self._field.append(name)
-
- def clear_view(self):
- self._view.clear()
- self._subviews.clear()
- self._field.clear()
-
- def redraw(self):
- for v in self._subviews + self._field:
- self._view[v].draw(self, None)
-
- def get_view(self, label):
- if label in self._view:
- return self._view[label]
- return None
-
-class InteractiveShell(InteractiveRenderer):
- def __init__(self, root: Renderable) -> None:
- super().__init__()
- self.__levels = [RootWrapper(root)]
- self.__aborted = True
- self.__sctx = ShellContext()
- self.__cmd = {
- "ls": (
- "list all config node under current collection",
- lambda *args:
- self.__sctx.redraw()
- ),
- "help": (
- "print help prompt for given name of node",
- lambda *args:
- self.print_help(*args)
- ),
- "type": (
- "print the type of a config node",
- lambda *args:
- self.print_type(*args)
- ),
- "cd": (
- "navigate to a collection node given a unix-like path",
- lambda *args:
- self.get_in(*args)
- ),
- "usage": (
- "print out the usage",
- lambda *args:
- self.print_usage()
- )
- }
-
- def get_path(self):
- l = [level.label for level in self.__levels[1:]]
- return f"/{'/'.join(l)}"
-
- def resolve(self, relative):
- ctx = ShellContext()
- p = join_path(self.get_path(), relative)
- ps = Path(p).resolve().parts
- if ps[0] == '/':
- ps = ps[1:]
-
- node = self.__levels[0]
- levels = [node]
- for part in ps:
- if not node.expand(ctx):
- raise ShellException(f"node is not a collection: {part}")
-
- node = ctx.get_view(part)
- if not node:
- raise ShellException(f"no such node: {part}")
-
- levels.append(node)
-
- return (node, levels)
-
- def print_usage(self):
- for cmd, (desc, _) in self.__cmd.items():
- print("\n".join([
- cmd,
- f" {desc}",
- ""
- ]))
-
- def print_help(self, node_name):
- view, _ = self.resolve(node_name)
-
- print(view.get_help(self.__sctx))
-
- def print_type(self, node_name):
- view, _ = self.resolve(node_name)
-
- print(view.get_type(self.__sctx))
-
- def do_read(self, node_name):
- view, _ = self.resolve(node_name)
- rd_val = view.read(self.__sctx)
- if rd_val is None:
- raise ShellException(f"config node {view.label} is not readable")
-
- print(rd_val)
-
- def do_write(self, node_name, val):
- view, _ = self.resolve(node_name)
- wr = view.write(self.__sctx, val)
- if not wr:
- raise ShellException(f"config node {view.label} is read only")
-
- print(f"write: {val}")
- self.do_render()
-
- def get_in(self, node_name):
- view, lvl = self.resolve(node_name)
-
- if not view.expand(self.__sctx):
- print(f"{node_name} is not a collection")
- return
-
- self.__levels = lvl
-
- def do_render(self):
-
- curr = self.__levels[-1]
- curr.expand(self.__sctx)
-
- def __loop(self):
- prefix = "config: %s> "%(self.__levels[-1].label)
-
- line = input(prefix)
- args = shlex.split(line)
- if not args:
- return True
-
- cmd = args[0]
- if cmd in self.__cmd:
- self.__cmd[cmd][1](*args[1:])
- return True
-
- if cmd == "exit":
- self.__aborted = False
- return False
-
- if cmd == "abort":
- return False
-
- node = self.resolve(cmd)
- if not node:
- raise ShellException(f"unrecognised command {line}")
-
- if len(args) == 3 and args[1] == '=':
- self.do_write(args[0], args[2])
- return True
-
- if len(args) == 1:
- self.do_read(args[0])
- return True
-
- print(f"unrecognised command {line}")
- return True
-
-
- def render_loop(self):
- self.do_render()
- print("\n".join([
- "Interactive LConfig Shell",
- " type 'usage' to find out how to use",
- " type 'exit' to end (with saving)",
- " type 'abort' to abort (without saving)",
- " type node name directly to read the value",
- " type 'node = val' to set 'val' to 'node'",
- ""
- ]))
- while True:
- try:
- if not self.__loop():
- break
- except KeyboardInterrupt:
- break
- except ConfigTypeCheckError as e:
- print(e.args[0])
- except ShellException as e:
- print(e.args[0])
-
- return not self.__aborted
\ No newline at end of file