-use("kernel")
-use("libs")
-use("arch")
-use("hal")
+from . import kernel, libs, arch, hal
-headers([
- "includes",
- "includes/usr"
-])
+src.h += "includes", "includes/usr"
-# compliation setting
+flag.cc += "-ffreestanding", "-fno-pie"
-compile_opts([
- "-ffreestanding",
- "-fno-pie"
-])
+flag.cc += (
+ "-Wall -Wextra -Werror",
+ "-Wno-unknown-pragmas",
+ "-Wno-unused-function",
+ "-Wno-unused-variable",
+ "-Wno-unused-but-set-variable",
+ "-Wno-unused-parameter",
+ "-Wno-discarded-qualifiers",
+ "-Werror=incompatible-pointer-types"
+)
-linking_opts([
- "-nostdlib",
- "-nolibc",
- "-z noexecstack",
- "-no-pie",
-])
+flag.cc += (
+ "-fno-omit-frame-pointer",
+ "-finline-small-functions",
+)
-linking_opts([
- "-Wl,--build-id=none"
-])
\ No newline at end of file
+flag.ld += "-nostdlib", "-nolibc", "-z noexecstack", "-no-pie", "-Wl,--build-id=none"
\ No newline at end of file
-import time
from datetime import datetime, date
-include("kernel")
-include("arch")
-include("hal")
+from . import kernel, arch, hal
-@Term("Kernel Version")
-@ReadOnly
-def lunaix_ver():
+@"Kernel Version"
+@readonly
+def lunaix_ver() -> str:
"""
Lunaix kernel version
"""
-
- type(str)
today = date.today()
year = today.year
start_of_year = datetime(year, 1, 1).date()
seq_num = (today - start_of_year).days
- default("%s v0.%d%d"%(v(arch), year - 2000, seq_num))
+ return "%s v0.%d%d"%(arch.val, year - 2000, seq_num)
-@Collection("Kernel Debug and Testing")
+@"Kernel Debug and Testing"
def debug_and_testing():
"""
General settings for kernel debugging feature
"""
- @Term("Supress assertion")
- def no_assert():
+ @"Supress assertion"
+ def no_assert() -> bool:
"""
Supress all assertion fail activity.
Note: Enable this is highly NOT recommended and would result system
extermly unstable
"""
- type(bool)
- default(False)
- @Term("Report on stalled thread")
- def check_stall():
+ return False
+
+ @"Report on stalled thread"
+ def check_stall() -> bool:
"""
Check and report on any thread that spend too much time in kernel.
"""
- type(bool)
- default(True)
+ return True
- @Term("Max kernel time allowance")
- def stall_timeout():
+ @"Max kernel time allowance"
+ def stall_timeout() -> int:
"""
Set the maximum time (in seconds) spent in kernel before considered
to be stalled.
"""
+ require(check_stall)
- type(int)
- default(10)
-
- return v(check_stall)
+ return 10
- @Term("Max number of preemptions")
- def stall_max_preempts():
+ @"Max number of preemptions"
+ def stall_max_preempts() -> int:
"""
Set the maximum number of preemptions that a task can take
before it is considered to be stucked in some loops.
Setting it to 0 disable this check
"""
+ require(check_stall)
- type(int)
- default(0)
+ return 0
- return v(check_stall)
\ No newline at end of file
-use("generic")
+from . import generic
-use({
- config("arch"): {
- "i386": "x86",
- "x86_64": "x86",
- "aarch64": "arm",
- "rv64": "riscv"
- }
-})
\ No newline at end of file
+match config.arch:
+ case "i386" | "x86_64":
+ from . import x86
+ # case "aarch64":
+ # from . import arm64
+ # case "rv64":
+ # from . import rv64
+ case _:
+ from . import x86
-include("x86/LConfig")
+from . import x86
-@Collection("Platform")
+@"Platform"
def architecture_support():
"""
Config ISA related features
"""
- @Term("Architecture")
- def arch():
+ @flag
+ def arch_x86_32() -> bool:
+ return arch.val == "i386"
+
+ @flag
+ def arch_x86_64() -> bool:
+ return arch.val == "x86_64"
+
+ @flag
+ def arch_x86() -> bool:
+ return arch.val in ["x86_64", "i386"]
+
+ @"Architecture"
+ def arch() -> "i386" | "x86_64":
"""
Config ISA support
"""
- # type(["i386", "x86_64", "aarch64", "rv64"])
- type(["i386", "x86_64"])
- default("x86_64")
-
- env_val = env("ARCH")
- if env_val:
- set_value(env_val)
+ _arch = env("ARCH")
+ return _arch if _arch else "x86_64"
- @Term("Base operand size")
- @ReadOnly
- def arch_bits():
+ @"Base operand size"
+ @readonly
+ def arch_bits() -> 32 | 64:
"""
Defines the base size of a general register of the
current selected ISA.
This the 'bits' part when we are talking about a CPU
"""
- type(["64", "32"])
- match v(arch):
+ match arch.val:
case "i386":
- default("32")
+ return 32
case "aarch64":
- default("64")
+ return 64
case "rv64":
- default("64")
+ return 64
case "x86_64":
- default("64")
+ return 64
case _:
- default("32")
+ return 32
\ No newline at end of file
-headers("includes")
+src.h += "includes"
-sources([
+src.c += (
"bootmem.c",
"trace.c",
"vmutils.c",
"procvm.c",
"arch.c",
"kremap.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-use("hal")
+from . import hal
-sources([
+src.c += (
"exceptions/interrupts.c",
"exceptions/isrdef.c",
"exceptions/intrhnds.S",
-])
+)
-sources([
+src.c += (
"boot/mb_parser.c",
"boot/kpt_setup.c",
"boot/boot_helper.c",
-])
+)
-sources([
+src.c += (
"mm/fault.c",
"mm/tlb.c",
"mm/pmm.c",
"mm/gdt.c",
"mm/vmutils.c"
-])
+)
-sources([
+src.c += (
"klib/fast_crc.c",
"klib/fast_str.c",
"exec/exec.c",
"hart.c",
"failsafe.S",
"syscall_lut.S"
-])
+)
-sources({
- config("arch"): {
- "x86_64": [
+match config.arch:
+ case "x86_64":
+ src.c += (
"hart64.c",
"syscall64.S",
"exceptions/interrupt64.S",
"boot/x86_64/prologue64.S",
"boot/x86_64/kremap64.c",
"exec/elf64.c"
- ],
- "i386": [
+ )
+ case "i386":
+ src.c += (
"hart32.c",
"syscall32.S",
"exceptions/interrupt32.S",
"boot/i386/prologue32.S",
"boot/i386/kremap32.c",
"exec/elf32.c"
- ]
- }
-})
+ )
-headers([
- "includes"
-])
+src.h += "includes"
-
-if config("arch") == "x86_64":
- compile_opts([
+if config.arch == "x86_64":
+ flag.cc += (
"-m64",
"-fno-unwind-tables",
"-fno-asynchronous-unwind-tables",
"-mcmodel=large"
- ])
- linking_opts([
- "-m64",
- ])
+ )
+ flag.ld += "-m64"
else:
- compile_opts("-m32")
- linking_opts("-m32")
+ flag.cc += "-m32"
+ flag.ld += "-m32"
-if not config("x86_enable_sse_feature"):
- compile_opts("-mno-sse")
\ No newline at end of file
+if not config.x86_enable_sse_feature:
+ flag.cc += "-mno-sse"
\ No newline at end of file
-@Group
+@(parent := architecture_support)
def x86_configurations():
-
- add_to_collection(architecture_support)
- @Term("Use SSE2/3/4 extension")
- def x86_enable_sse_feature():
+ require(arch_x86)
+
+ @flag
+ def x86_bl_mb() -> bool:
+ return x86_bl.val == "mb"
+
+ @flag
+ def x86_bl_mb2() -> bool:
+ return x86_bl.val == "mb2"
+
+ @"Use SSE2/3/4 extension"
+ def x86_enable_sse_feature() -> bool:
"""
Config whether to allow using SSE feature for certain
optimization
"""
- type(bool)
- default(False)
+ return False
-
- @Term("Bootloader Model")
- def x86_bl():
+ @"Bootloader Model"
+ def x86_bl() -> "mb" | "mb2":
"""
Select the bootloader interface
none: do not use any interface
"""
- type(["mb"])
- # type(["mb", "mb2", "none"])
- default("mb")
-
-
- return v(arch) in ["i386", "x86_64"]
\ No newline at end of file
+ return "mb"
\ No newline at end of file
-sources([
+src.c += (
"apic.c",
"rngx86.c",
"cpu.c",
"apic_timer.c",
"mc146818a.c",
"pci.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-use("acpi")
-use("ahci")
-use("char")
-use("gfxa")
-use("rtc")
-use("term")
-use("timer")
-use("bus")
+from . import acpi, ahci, char, gfxa, rtc, term, timer, bus
-if config("use_devicetree"):
- use("devtree")
+if config.use_devicetree:
+ from . import devtree
-sources([
- "irq.c"
-])
\ No newline at end of file
+src.c += "irq.c"
\ No newline at end of file
-include("char")
-include("bus")
-include("ahci")
+from . import char, bus, ahci
-@Collection("Devices & Peripherials")
+@"Devices & Peripherials"
def hal():
""" Lunaix hardware asbtraction layer """
- @Term("Devicetree for hardware discovery")
- def use_devicetree():
+ @"Devicetree for hardware discovery"
+ def use_devicetree() -> bool:
"""
Decide whether to use Devicetree for platform
resource topology sensing.
way.
"""
- type(bool)
- default(not v(arch).startswith("x86"))
+ return arch.val not in ["x86_64", "i386"]
- @ReadOnly
- @Term("Maximum size of device tree blob (in KiB)")
- def dtb_maxsize():
+ @"Maximum size of device tree blob (in KiB)"
+ @readonly
+ def dtb_maxsize() -> int:
"""
Maximum size for a firmware provided device tree blob
"""
+ require(use_devicetree)
- type(int)
- default(256)
-
- return v(use_devicetree)
\ No newline at end of file
+ return 256
\ No newline at end of file
-sources([
+src.c += (
"parser/madt_parser.c",
"parser/mcfg_parser.c",
"acpi.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-if config("ahci_enable"):
- sources([
+if config.ahci_enable:
+ src.c += (
"ahci_pci.c",
"hbadev_export.c",
"ahci.c",
"io_event.c",
"atapi.c",
"ata.c"
- ])
\ No newline at end of file
+ )
\ No newline at end of file
-@Collection("AHCI")
+@"AHCI"
+@(parent := hal)
def sata_ahci():
- add_to_collection(hal)
-
- @Term("Enable AHCI support")
- def ahci_enable():
+ @"Enable AHCI support"
+ def ahci_enable() -> bool:
""" Enable the support of SATA AHCI.
Must require PCI at current stage """
+ require(pci_enable)
- type(bool)
- default(True)
-
- if not v(pci_enable):
- set_value(False)
+ return True
\ No newline at end of file
-if config("pci_enable"):
- sources("pci.c")
\ No newline at end of file
+if config.pci_enable:
+ src.c += "pci.c"
\ No newline at end of file
-@Collection("Buses & Interconnects")
+@"Buses & Interconnects"
+@(parent := hal)
def bus_if():
""" System/platform bus interface """
- add_to_collection(hal)
-
- @Term("PCI")
- def pci_enable():
+ @"PCI"
+ def pci_enable() -> bool:
""" Peripheral Component Interconnect (PCI) Bus """
- type(bool)
- default(True)
+ return True
- @Term("PCI Express")
- def pcie_ext():
+ @"PCI Express"
+ def pcie_ext() -> bool:
""" Enable support of PCI-Express extension """
- type(bool)
- default(False)
+ require(pci_enable)
- return v(pci_enable)
+ return False
- @Term("Use PMIO for PCI")
- def pci_pmio():
+ @"Use PMIO for PCI"
+ def pci_pmio() -> bool:
""" Use port-mapped I/O interface for controlling PCI """
- type(bool)
-
- has_pcie = v(pcie_ext)
- is_x86 = v(arch) in [ "i386", "x86_64" ]
-
- default(not has_pcie)
-
- if not is_x86 or has_pcie:
- set_value(False)
+ require(not pcie_ext and pci_enable)
+ require(arch_x86)
- return is_x86 and v(pci_enable)
\ No newline at end of file
+ return arch.val in [ "i386", "x86_64" ]
-use("uart")
+from . import uart
-sources([
+if config.vga_console:
+ src.c += "lxconsole.c"
+
+if config.chargame_console:
+ src.c += "chargame.c"
+
+src.c += (
"devnull.c",
"serial.c",
"devzero.c",
-])
-
-if config("vga_console"):
-
- sources("lxconsole.c")
-
-if config("chargame_console"):
-
- sources("chargame.c")
\ No newline at end of file
+)
\ No newline at end of file
-include("uart")
+from . import uart
-@Collection("Character Devices")
+@"Character Devices"
+@(parent := hal)
def char_device():
""" Controlling support of character devices """
- add_to_collection(hal)
-
- @Term("VGA 80x25 text-mode console")
- def vga_console():
+ @"VGA 80x25 text-mode console"
+ def vga_console() -> bool:
""" Enable VGA console device (text mode only) """
- type(bool)
- default(True)
+ return True
- @Term("VGA character game device")
- def chargame_console():
+ @"VGA character game device"
+ def chargame_console() -> bool:
"""
Enable VGA Charactor Game console device (text mode only)
You normally don't need to include this, unless you want some user space fun ;)
"""
- type(bool)
- default(False)
\ No newline at end of file
+ return False
\ No newline at end of file
-sources([
+src.c += (
"16x50_base.c",
"16x50_pmio.c",
"16x50_mmio.c",
"16x50_dev.c",
-])
+)
-if config("xt_16x50"):
- sources("16x50_isa.c")
+if config.xt_16x50:
+ src.c += "16x50_isa.c"
-if config("pci_16x50"):
- sources("16x50_pci.c")
\ No newline at end of file
+if config.pci_16x50:
+ src.c += "16x50_pci.c"
\ No newline at end of file
-@Collection("16x50 Serial Controller")
+@"16x50 Serial Controller"
+@(parent := char_device)
def uart_16x50():
""" 16x50 serial controller """
- # hal/char/LConfig::char_device
- add_to_collection(char_device)
-
- @Term("16x50 XT-Compat")
- def xt_16x50():
+ @"16x50 XT-Compat"
+ def xt_16x50() -> bool:
""" Enable the 16x50 for PC-compatible platform """
- type(bool)
-
- default(True)
-
- is_x86 = v(arch) in ["i386", "x86_64"]
- if not is_x86:
- set_value(False)
-
- return is_x86
+ return arch.val in ["i386", "x86_64"]
- @Term("16x50 PCI")
- def pci_16x50():
+ @"16x50 PCI"
+ def pci_16x50() -> bool:
""" Enable the support of PCI 16x50 """
- type(bool)
- default(True)
\ No newline at end of file
+ return True
\ No newline at end of file
-sources([
+src.c += (
"dt_interrupt.c",
"dt.c",
"dtm.c",
"dtspec.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-use("vga")
+from . import vga
-sources([
- "gfxm.c"
-])
\ No newline at end of file
+src.c += "gfxm.c"
\ No newline at end of file
-sources([
+src.c += (
"vga_pmio_ops.c",
"vga_gfxm_ops.c",
"vga.c",
"vga_mmio_ops.c",
"vga_pci.c",
"vga_rawtty.c",
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
- "rtc_device.c"
-])
\ No newline at end of file
+src.c += "rtc_device.c"
\ No newline at end of file
-sources([
+src.c += (
"term.c",
"console.c",
"term_io.c",
"default_ops.c",
"lcntls/ansi_cntl.c",
"lcntls/lcntl.c",
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
- "timer_device.c"
-])
\ No newline at end of file
+src.c += "timer_device.c"
\ No newline at end of file
-use("block")
-use("debug")
-use("device")
-use("ds")
-use("exe")
-use("fs")
-use("mm")
-use("process")
+from . import block, debug, device, ds, exe, fs, mm, process
-sources([
+src.c += (
"boot_helper.c",
"kcmd.c",
"kinit.c",
"kprint/kp_records.c",
"kprint/kprintf.c",
"time/clock.c",
- "time/timer.c",
-])
\ No newline at end of file
+ "time/timer.c"
+)
\ No newline at end of file
-@Collection
+from . import fs, mm
+
+@"Kernel Feature"
def kernel_feature():
""" Config kernel features """
pass
-
-include("fs")
-include("mm")
-sources([
+src.c += (
"blkpart_gpt.c",
"blk_mapping.c",
"blkio.c",
"block.c",
"blkbuf.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
+src.c += (
"failsafe.c",
"trace.c",
# "gdbstub.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
+src.c += (
"device.c",
"potentem.c",
"devdb.c",
"devfs.c",
"input.c",
"poll.c",
-])
+)
-sources([
+src.c += (
"waitq.c",
"buffer.c",
"rbuffer.c",
"hstr.c",
"fifo.c",
"rwlock.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-use("elf-generic")
+import_("elf-generic")
-sources([
- "exec.c"
-])
\ No newline at end of file
+src.c += "exec.c"
\ No newline at end of file
-sources([
+src.c += (
"elfbfmt.c",
"ldelf.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-use("twifs")
-use("ramfs")
+from . import twifs, ramfs
-if configured("fs_iso9660"):
- use("iso9660")
+if config.fs_ext2:
+ from . import ext2
-if configured("fs_ext2"):
- use("ext2")
+if config.fs_iso9660:
+ from . import iso9660
-sources([
+src.c += (
"twimap.c",
"pcache.c",
"mount.c",
"path_walk.c",
"fsm.c",
"fs_export.c",
-])
+)
+from . import ext2
-@Collection("File Systems")
+@"File Systems"
+@(parent := kernel_feature)
def file_system():
""" Config feature related to file system supports """
- add_to_collection(kernel_feature)
-
- @Term("ext2 support")
- def fs_ext2():
+ @"ext2 support"
+ def fs_ext2() -> bool:
""" Enable ext2 file system support """
- type(bool)
- default(True)
+ return True
- @Term("iso9660 support")
- def fs_iso9660():
+ @"iso9660 support"
+ def fs_iso9660() -> bool:
""" Enable iso9660 file system support """
- type(bool)
- default(True)
-
-
-include("ext2")
\ No newline at end of file
+ return True
-sources([
+src.c += (
"alloc.c",
"dir.c",
"file.c",
"group.c",
"inode.c",
"mount.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-@Collection("ext2")
+@"ext2"
+@(parent := file_system)
def ext2_fs():
- add_to_collection(file_system)
- @Term("Debug Messages")
- def ext2_debug_msg():
- type(bool)
- default(False)
+ require(fs_ext2)
- return v(fs_ext2)
\ No newline at end of file
+ @"Debug Messages"
+ def ext2_debug_msg() -> bool:
+ return False
\ No newline at end of file
-sources([
+src.c += (
"inode.c",
"file.c",
"mount.c",
"directory.c",
"utils.c",
"rockridge.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
- "ramfs.c"
-])
\ No newline at end of file
+src.c += "ramfs.c"
\ No newline at end of file
-sources([
- "twifs.c"
-])
\ No newline at end of file
+src.c += "twifs.c"
\ No newline at end of file
-sources([
+src.c += (
"mmap.c",
"valloc.c",
"cake.c",
"cake_export.c",
"vmap.c",
"dmm.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
-@Collection("Memory Management")
+@"Memory Management"
+@(parent := kernel_feature)
def memory_subsystem():
""" Config the memory subsystem """
- @Collection("Physical Memory")
+ @"Physical Memory"
def physical_mm():
""" Physical memory manager """
- @Term("Allocation policy")
- def pmalloc_method():
+ @flag
+ def pmalloc_method_simple() -> bool:
+ return pmalloc_method.val == "simple"
+
+ @flag
+ def pmalloc_method_buddy() -> bool:
+ return pmalloc_method.val == "buddy"
+
+ @flag
+ def pmalloc_method_ncontig() -> bool:
+ return pmalloc_method.val == "ncontig"
+
+ @"Allocation policy"
+ def pmalloc_method() -> "simple" | "buddy" | "ncontig":
""" Allocation policy for phiscal memory """
- type(["simple", "buddy", "ncontig"])
- default("simple")
+ return "simple"
- @Group("Simple")
+ @"PMalloc Thresholds"
def pmalloc_simple_po_thresholds():
+
+ require(pmalloc_method_simple)
- @Term("Maximum cached order-0 free pages")
- def pmalloc_simple_max_po0():
+ @"Maximum cached order-0 free pages"
+ def pmalloc_simple_max_po0() -> int:
""" free list capacity for order-0 pages """
- type(int)
- default(4096)
+ return 4096
- @Term("Maximum cached order-1 free pages")
- def pmalloc_simple_max_po1():
+ @"Maximum cached order-1 free pages"
+ def pmalloc_simple_max_po1() -> int:
""" free list capacity for order-1 pages """
- type(int)
- default(2048)
+ return 2048
- @Term("Maximum cached order-2 free pages")
- def pmalloc_simple_max_po2():
+ @"Maximum cached order-2 free pages"
+ def pmalloc_simple_max_po2() -> int:
""" free list capacity for order-2 pages """
- type(int)
- default(2048)
+ return 2048
- @Term("Maximum cached order-3 free pages")
- def pmalloc_simple_max_po3():
+ @"Maximum cached order-3 free pages"
+ def pmalloc_simple_max_po3() -> int:
""" free list capacity for order-3 pages """
- type(int)
- default(2048)
+ return 2048
- @Term("Maximum cached order-4 free pages")
- def pmalloc_simple_max_po4():
+ @"Maximum cached order-4 free pages"
+ def pmalloc_simple_max_po4() -> int:
""" free list capacity for order-4 pages """
- type(int)
- default(512)
+ return 512
- @Term("Maximum cached order-5 free pages")
- def pmalloc_simple_max_po5():
+ @"Maximum cached order-5 free pages"
+ def pmalloc_simple_max_po5() -> int:
""" free list capacity for order-5 pages """
- type(int)
- default(512)
+ return 512
- @Term("Maximum cached order-6 free pages")
- def pmalloc_simple_max_po6():
+ @"Maximum cached order-6 free pages"
+ def pmalloc_simple_max_po6() -> int:
""" free list capacity for order-6 pages """
- type(int)
- default(128)
+ return 128
- @Term("Maximum cached order-7 free pages")
- def pmalloc_simple_max_po7():
+ @"Maximum cached order-7 free pages"
+ def pmalloc_simple_max_po7() -> int:
""" free list capacity for order-7 pages """
- type(int)
- default(128)
+ return 128
- @Term("Maximum cached order-8 free pages")
- def pmalloc_simple_max_po8():
+ @"Maximum cached order-8 free pages"
+ def pmalloc_simple_max_po8() -> int:
""" free list capacity for order-8 pages """
- type(int)
- default(64)
+ return 64
- @Term("Maximum cached order-9 free pages")
- def pmalloc_simple_max_po9():
+ @"Maximum cached order-9 free pages"
+ def pmalloc_simple_max_po9() -> int:
""" free list capacity for order-9 pages """
- type(int)
- default(32)
-
- return v(pmalloc_method) == "simple"
-
- add_to_collection(kernel_feature)
\ No newline at end of file
+ return 32
\ No newline at end of file
-sources([
+src.c += (
"signal.c",
"sched.c",
"fork.c",
"thread.c",
"preemption.c",
"switch.c",
-])
\ No newline at end of file
+)
\ No newline at end of file
-sources([
+src.c += (
"crc.c",
"hash.c",
"klibc/itoa.c",
"klibc/string/strcpy.c",
"klibc/string/strlen.c",
"klibc/string/trim.c"
-])
\ No newline at end of file
+)
\ No newline at end of file
include utils.mkinc
include $(lbuild_mkinc)
+include $(lconfig_mkinc)
-ksrc_objs := $(addsuffix .o,$(_LBUILD_SRCS))
-ksrc_deps := $(addsuffix .d,$(_LBUILD_SRCS))
-khdr_opts := $(addprefix -include ,$(_LBUILD_HDRS))
-kinc_opts := $(addprefix -I,$(_LBUILD_INCS))
+ksrc_objs := $(addsuffix .o,$(BUILD_SRCS))
+ksrc_deps := $(addsuffix .d,$(BUILD_SRCS))
+khdr_opts := $(addprefix -include ,$(BUILD_HDR))
+kinc_opts := $(addprefix -I,$(BUILD_INC))
config_h += -include $(lbuild_config_h)
kcflags := $(khdr_opts) $(kinc_opts) $(config_h)
+CFLAGS += $(BUILD_CFLAGS)
+LDFLAGS += $(BUILD_LDFLAGS)
+
-include $(ksrc_deps)
\ No newline at end of file
lbuild_dir := $(CURDIR)/.builder
-lbuild_config_h := $(lbuild_dir)/configs.h
-lbuild_mkinc := $(lbuild_dir)/lbuild.mkinc
+lbuild_config_h := $(lbuild_dir)/config.h
+lbuild_mkinc := $(lbuild_dir)/build.mkinc
+lconfig_mkinc := $(lbuild_dir)/config.mkinc
lconfig_save := $(CURDIR)/.config.json
-lbuild_opts := --lconfig-file LConfig
-
all_lconfigs = $(shell find $(CURDIR) -name "LConfig")
+all_lbuilds = $(shell find $(CURDIR) -name "LBuild")
+
+.PHONY: __gen_config __gen_build __gen_both config
-LCONFIG_FLAGS += --config $(lbuild_opts) --config-save $(lconfig_save)
+define __gen_config
+ @echo restarting configuration...
+ @$(LBUILD) --gen-config $(lbuild_dir)
+endef
+
+define __gen_build
+ @echo restarting configuration...
+ @$(LBUILD) --gen-config $(lbuild_dir)
+endef
+
+define __gen_both
+ @echo restarting configuration...
+ @$(LBUILD) --gen-build --gen-config $(lbuild_dir)
+endef
export
$(lconfig_save): $(all_lconfigs)
- @echo restarting configuration...
- @$(LBUILD) $(LCONFIG_FLAGS) --force -o $(lbuild_dir)/
+ $(call __gen_config)
export
-$(lbuild_config_h): $(lconfig_save)
- @$(LBUILD) $(LCONFIG_FLAGS) -o $(@D)
+$(lconfig_mkinc): $(all_lconfigs)
+ $(call __gen_config)
export
-$(lbuild_mkinc): $(lbuild_config_h)
- @$(LBUILD) LBuild $(lbuild_opts) -o $(@D)
+$(lbuild_config_h): $(all_lbuilds)
+ $(call __gen_build)
-.PHONY: config
export
-config: $(all_lconfigs)
- @$(LBUILD) $(LCONFIG_FLAGS) --force\
- -o $(lbuild_dir)/
+$(lbuild_mkinc): $(all_lbuilds)
+ $(call __gen_build)
+
+config: $(all_lbuilds) $(all_lconfigs)
+ $(call __gen_both)
AR := $(CX_PREFIX)ar
LBUILD ?= $(shell realpath ./scripts/build-tools/luna_build.py)
-
-W := -Wall -Wextra -Werror \
- -Wno-unknown-pragmas \
- -Wno-unused-function \
- -Wno-unused-variable\
- -Wno-unused-but-set-variable \
- -Wno-unused-parameter \
- -Wno-discarded-qualifiers\
- -Werror=incompatible-pointer-types
-
-OFLAGS := -fno-omit-frame-pointer \
- -finline-small-functions
-
-CFLAGS := -std=gnu99 $(OFLAGS) $(W) -g
-
ifeq ($(MODE),debug)
O := -Og
else
O := -O2
endif
-CFLAGS += $(O)
-
LDFLAGS := $(O)
-
+CFLAGS := -std=gnu99 -g $(O)
MKFLAGS := --no-print-directory
\ No newline at end of file
+++ /dev/null
-from lbuild.api import BuildGenerator
-from lbuild.common import BuildEnvironment
-from lib.utils import join_path
-from os import getenv
-
-class MakefileBuildGen(BuildGenerator):
- def __init__(self, out_dir, name = "lbuild.mkinc") -> None:
- self.__path = join_path(out_dir, name)
-
- def emit_makearray(self, name, values):
- r = []
- r.append(f"{name} :=")
- for v in values:
- r.append(f"{v}")
- return [" ".join(r)]
-
- def generate(self, env: BuildEnvironment):
- path = env.to_wspath(self.__path)
- lines = []
-
- opts = env.get_object("CC_OPTS", [])
- lines.append("CFLAGS += %s"%(" ".join(opts)))
-
- opts = env.get_object("LD_OPTS", [])
- lines.append("LDFLAGS += %s"%(" ".join(opts)))
-
- arr = self.emit_makearray("_LBUILD_SRCS", env.srcs())
- lines += arr
-
- arr = self.emit_makearray("_LBUILD_HDRS", env.headers())
- lines += arr
-
- arr = self.emit_makearray("_LBUILD_INCS", env.includes())
- lines += arr
-
- with open(path, 'w') as f:
- f.write("\n".join(lines))
-
-
-def install_lbuild_functions(_env: BuildEnvironment):
- def set_opts(env: BuildEnvironment, name, opts, override):
- if not isinstance(opts, list):
- opts = [opts]
-
- _opts = env.get_object(name, [])
-
- if override:
- _opts = opts
- else:
- _opts += opts
-
- env.set_object(name, _opts)
-
- def compile_opts(env: BuildEnvironment, opts, override=False):
- set_opts(env, "CC_OPTS", opts, override)
-
- def linking_opts(env: BuildEnvironment, opts, override=False):
- set_opts(env, "LD_OPTS", opts, override)
-
- def env(env, name, default=None):
- return getenv(name, default)
-
-
- _env.add_external_func(compile_opts)
- _env.add_external_func(linking_opts)
- _env.add_external_func(env)
\ No newline at end of file
+++ /dev/null
-from lcfg.api import ConfigIOProvider
-from lcfg.utils import is_basic
-
-import re
-
-class CHeaderConfigProvider(ConfigIOProvider):
- def __init__(self, header_save,
- header_prefix="CONFIG_") -> None:
- self.__header_export = header_save
- self.__prefix = header_prefix
- self.__re = re.compile(r"^[A-Za-z0-9_]+$")
-
- def export(self, env, config_dict):
- lines = []
- for k, v in config_dict.items():
- result = [ "#define" ]
- s = str.upper(k)
- s = f"{self.__prefix}{s}"
-
- if isinstance(v, str) and self.__re.match(v):
- s = f"{s}_{str.upper(v)}"
- v = ""
-
- result.append(s)
-
- v = self.serialize_value(v)
- if v is None or v is False:
- result.insert(0, "//")
- elif isinstance(v, str):
- result.append(v)
-
- lines.append(" ".join(result))
-
- with open(self.__header_export, 'w') as f:
- f.write("\n".join(lines))
-
-
- def serialize_value(self, v):
- if v is None:
- return None
-
- if isinstance(v, bool):
- return v
-
- if v and isinstance(v, str):
- return f'"{v}"'
-
- if is_basic(v):
- return str(v)
-
- raise ValueError(
- f"serialising {type(v)}: not supported")
+++ /dev/null
-from lbuild.api import ConfigProvider
-from lcfg.common import LConfigEnvironment
-
-class LConfigProvider(ConfigProvider):
- def __init__(self, lcfg_env: LConfigEnvironment) -> None:
- super().__init__()
- self.__env = lcfg_env
-
- def configured_value(self, name):
- return self.__env.lookup_value(name)
-
- def has_config(self, name):
- try:
- v = self.__env.lookup_value(name)
- return not not v
- except:
- return False
\ No newline at end of file
+++ /dev/null
-from lcfg.api import (
- RenderContext,
- InteractiveRenderer,
- Renderable,
- ConfigTypeCheckError,
- ConfigLoadException
-)
-from lcfg.common import LConfigEnvironment
-
-import shlex
-from lib.utils import join_path
-from pathlib import Path
-import readline
-
-class ShellException(Exception):
- def __init__(self, *args: object) -> None:
- super().__init__(*args)
-
-class ViewElement:
- def __init__(self, label, node) -> None:
- self.label = label
- self.node = node
-
- def expand(self, sctx):
- return False
-
- def read(self, sctx):
- return None
-
- def write(self, sctx, val):
- return None
-
- def get_help(self, sctx):
- return self.node.help_prompt()
-
- def get_type(self, sctx):
- return "N/A"
-
- def draw(self, sctx, canvas):
- pass
-
-
-class SubviewLink(ViewElement):
- def __init__(self, label, node, cb) -> None:
- super().__init__(label, node)
- self.__callback = cb
-
- def expand(self, sctx):
- sctx.clear_view()
- self.__callback(sctx)
- return True
-
- def draw(self, sctx, canvas):
- print(f" [{self.label}]")
-
- def get_type(self, sctx):
- return f"Collection: {self.label}"
-
-class RootWrapper(ViewElement):
- def __init__(self, root) -> None:
- self.__root = root
- super().__init__("root", None)
-
- def expand(self, sctx):
- sctx.clear_view()
- self.__root.render(sctx)
- return True
-
- def draw(self, sctx, canvas):
- pass
-
- def get_type(self, sctx):
- return ""
-
-class FieldView(ViewElement):
- def __init__(self, label, node) -> None:
- super().__init__(label, node)
-
- def read(self, sctx):
- return self.node.get_value()
-
- def write(self, sctx, val):
- if self.node.read_only():
- return None
-
- self.node.set_value(val)
- return val
-
- def get_type(self, sctx):
- return f"Config term: {self.label}\n{self.node.get_type()}"
-
- def draw(self, sctx, canvas):
- suffix = ""
- if self.node.read_only():
- suffix = " (ro)"
- print(f" {self.label}{suffix}")
-
-class ShellContext(RenderContext):
- def __init__(self) -> None:
- super().__init__()
-
- self._view = {}
- self._subviews = []
- self._field = []
-
- def add_expandable(self, label, node, on_expand_cb):
- name = node.get_name()
- self._view[name] = SubviewLink(name, node, on_expand_cb)
- self._subviews.append(name)
-
- def add_field(self, label, node):
- name = node.get_name()
- self._view[name] = FieldView(name, node)
- self._field.append(name)
-
- def clear_view(self):
- self._view.clear()
- self._subviews.clear()
- self._field.clear()
-
- def redraw(self):
- for v in self._subviews + self._field:
- self._view[v].draw(self, None)
-
- def get_view(self, label):
- if label in self._view:
- return self._view[label]
- return None
-
-class InteractiveShell(InteractiveRenderer):
- def __init__(self, root: Renderable) -> None:
- super().__init__()
- self.__levels = [RootWrapper(root)]
- self.__aborted = True
- self.__sctx = ShellContext()
- self.__cmd = {
- "ls": (
- "list all config node under current collection",
- lambda *args:
- self.__sctx.redraw()
- ),
- "help": (
- "print help prompt for given name of node",
- lambda *args:
- self.print_help(*args)
- ),
- "type": (
- "print the type of a config node",
- lambda *args:
- self.print_type(*args)
- ),
- "cd": (
- "navigate to a collection node given a unix-like path",
- lambda *args:
- self.get_in(*args)
- ),
- "usage": (
- "print out the usage",
- lambda *args:
- self.print_usage()
- )
- }
-
- def get_path(self):
- l = [level.label for level in self.__levels[1:]]
- return f"/{'/'.join(l)}"
-
- def resolve(self, relative):
- ctx = ShellContext()
- p = join_path(self.get_path(), relative)
- ps = Path(p).resolve().parts
- if ps[0] == '/':
- ps = ps[1:]
-
- node = self.__levels[0]
- levels = [node]
- for part in ps:
- if not node.expand(ctx):
- raise ShellException(f"node is not a collection: {part}")
-
- node = ctx.get_view(part)
- if not node:
- raise ShellException(f"no such node: {part}")
-
- levels.append(node)
-
- return (node, levels)
-
- def print_usage(self):
- for cmd, (desc, _) in self.__cmd.items():
- print("\n".join([
- cmd,
- f" {desc}",
- ""
- ]))
-
- def print_help(self, node_name):
- view, _ = self.resolve(node_name)
-
- print(view.get_help(self.__sctx))
-
- def print_type(self, node_name):
- view, _ = self.resolve(node_name)
-
- print(view.get_type(self.__sctx))
-
- def do_read(self, node_name):
- view, _ = self.resolve(node_name)
- rd_val = view.read(self.__sctx)
- if rd_val is None:
- raise ShellException(f"config node {view.label} is not readable")
-
- print(rd_val)
-
- def do_write(self, node_name, val):
- view, _ = self.resolve(node_name)
- wr = view.write(self.__sctx, val)
- if not wr:
- raise ShellException(f"config node {view.label} is read only")
-
- print(f"write: {val}")
- self.do_render()
-
- def get_in(self, node_name):
- view, lvl = self.resolve(node_name)
-
- if not view.expand(self.__sctx):
- print(f"{node_name} is not a collection")
- return
-
- self.__levels = lvl
-
- def do_render(self):
-
- curr = self.__levels[-1]
- curr.expand(self.__sctx)
-
- def __loop(self):
- prefix = "config: %s> "%(self.__levels[-1].label)
-
- line = input(prefix)
- args = shlex.split(line)
- if not args:
- return True
-
- cmd = args[0]
- if cmd in self.__cmd:
- self.__cmd[cmd][1](*args[1:])
- return True
-
- if cmd == "exit":
- self.__aborted = False
- return False
-
- if cmd == "abort":
- return False
-
- node = self.resolve(cmd)
- if not node:
- raise ShellException(f"unrecognised command {line}")
-
- if len(args) == 3 and args[1] == '=':
- self.do_write(args[0], args[2])
- return True
-
- if len(args) == 1:
- self.do_read(args[0])
- return True
-
- print(f"unrecognised command {line}")
- return True
-
-
- def render_loop(self):
- self.do_render()
- print("\n".join([
- "Interactive LConfig Shell",
- " type 'usage' to find out how to use",
- " type 'exit' to end (with saving)",
- " type 'abort' to abort (without saving)",
- " type node name directly to read the value",
- " type 'node = val' to set 'val' to 'node'",
- ""
- ]))
- while True:
- try:
- if not self.__loop():
- break
- except KeyboardInterrupt:
- break
- except ConfigTypeCheckError as e:
- print(e.args[0])
- except ShellException as e:
- print(e.args[0])
-
- return not self.__aborted
\ No newline at end of file
+++ /dev/null
-class ConfigProvider:
- def __init__(self) -> None:
- pass
-
- def configured_value(self, name):
- raise ValueError(f"config '{name}' is undefined or disabled")
-
- def has_config(self, name):
- return False
-
-class BuildGenerator:
- def __init__(self) -> None:
- pass
-
- def generate(self, env):
- pass
\ No newline at end of file
--- /dev/null
+import ast
+from pathlib import Path
+from .common import DirectoryTracker
+from lib.utils import ConfigAST, Schema
+
+class LBuildImporter(ast.NodeTransformer):
+ ConfigImportFn = Schema(
+ ast.Call,
+ func=Schema(ast.Name, id="import_"),
+ args=[ast.Constant])
+
+ ScopedAccess = Schema(
+ ast.Attribute,
+ value=ast.Name)
+
+ def __init__(self, env, file):
+ super().__init__()
+ self.__parent = file.parent
+ self.__env = env
+
+ with file.open('r') as f:
+ subtree = ast.parse(f.read())
+
+ self.__tree = [
+ DirectoryTracker.emit_enter(self.__parent),
+ *self.visit(subtree).body,
+ DirectoryTracker.emit_leave()
+ ]
+
+ def tree(self):
+ return self.__tree
+
+ def __gen_import_subtree(self, relpath, variants):
+ block = []
+ for var in variants:
+ p = relpath/ var / "LBuild"
+ block += LBuildImporter(self.__env, p).tree()
+
+ return ast.If(ast.Constant(True), block, [])
+
+ def visit_ImportFrom(self, node):
+ if ConfigAST.ConfigImport != node:
+ return node
+
+ module = node.module
+ module = "" if not module else module
+ subpath = Path(*module.split('.'))
+
+ p = self.__parent / subpath
+ return self.__gen_import_subtree(p, [x.name for x in node.names])
+
+ def visit_Attribute(self, node):
+ if LBuildImporter.ScopedAccess != node:
+ return self.generic_visit(node)
+
+ scope = node.value.id
+ subscope = node.attr
+
+ if scope not in self.__env.scopes:
+ return self.generic_visit(node)
+
+ provider = self.__env.scopes[scope]
+
+ return ast.Subscript(
+ value=ast.Name(provider.context_name(), ctx=node.value.ctx),
+ slice=ast.Constant(subscope),
+ ctx = node.ctx
+ )
+
+ def visit_Expr(self, node):
+ val = node.value
+ if LBuildImporter.ConfigImportFn == val:
+ name = val.args[0].value
+ return self.__gen_import_subtree(self.__parent, [name])
+
+ return self.generic_visit(node)
+
+class BuildEnvironment:
+ def __init__(self):
+ self.scopes = {}
+
+ def load(self, rootfile):
+ self.__exec = self.__load(Path(rootfile))
+
+ def register_scope(self, scope):
+ if scope.name in self.scopes:
+ raise Exception(f"{scope.name} already exists")
+ self.scopes[scope.name] = scope
+
+ def __load(self, rootfile):
+ module = ast.Module(
+ LBuildImporter(self, rootfile).tree(),
+ type_ignores=[])
+
+ module = ast.fix_missing_locations(module)
+ return compile(module, rootfile, 'exec')
+
+ def update(self):
+ g = {
+ "__LBUILD__": True,
+ }
+
+ for scope in self.scopes.values():
+ scope.reset()
+ g[scope.context_name()] = scope
+
+ DirectoryTracker.bind(g)
+ exec(self.__exec, g)
\ No newline at end of file
-from lib.utils import join_path
-import os
+import ast
+from pathlib import Path
-class BuildEnvironment:
- def __init__(self, workspace_dir, generator) -> None:
- self.__config_provider = None
- self.__sources = []
- self.__headers = []
- self.__inc_dir = []
- self.__ws_dir = workspace_dir
- self.__ext_object = {}
- self.__ext_function = {}
- self.__generator = generator
+class DirectoryTracker:
+ def __init__(self):
+ self.__stack = []
- def set_config_provider(self, provider):
- self.__config_provider = provider
-
- def config_provider(self):
- return self.__config_provider
-
- def add_sources(self, sources):
- self.__sources += sources
+ def push(self, dir):
+ self.__stack.append(Path(dir))
- def add_headers(self, sources):
- for h in sources:
- if not os.path.isdir(h):
- self.__headers.append(h)
- else:
- self.__inc_dir.append(h)
-
- def to_wspath(self, file):
- path = join_path(self.__ws_dir, file)
- return os.path.relpath(path, self.__ws_dir)
+ def pop(self):
+ self.__stack.pop()
- def export(self):
- self.__generator.generate(self)
-
- def get_object(self, key, _default=None):
- return _default if key not in self.__ext_object else self.__ext_object[key]
+ def active_relative(self):
+ root = self.__stack[0]
+ return self.__stack[-1].relative_to(root)
- def set_object(self, key, object):
- self.__ext_object[key] = object
-
- def srcs(self):
- return list(self.__sources)
-
- def headers(self):
- return list(self.__headers)
+ @staticmethod
+ def context_name():
+ return "__FILESTACK__"
- def includes(self):
- return list(self.__inc_dir)
+ @staticmethod
+ def get(context):
+ return context[DirectoryTracker.context_name()]
- def add_external_func(self, function):
- name = function.__name__
- invk = lambda *args, **kwargs: function(self, *args, **kwargs)
- self.__ext_function[name] = invk
-
- def external_func_table(self):
- return self.__ext_function
\ No newline at end of file
+ @staticmethod
+ def bind(context):
+ name = DirectoryTracker.context_name()
+ context[name] = DirectoryTracker()
+
+ @staticmethod
+ def emit_enter(dir):
+ name = DirectoryTracker.context_name()
+ return ast.Expr(
+ ast.Call(
+ func=ast.Attribute(
+ value=ast.Name(name, ctx=ast.Load()),
+ attr="push",
+ ctx=ast.Load()
+ ),
+ args=[
+ ast.Constant(str(dir))
+ ],
+ keywords=[]))
+
+ @staticmethod
+ def emit_leave():
+ name = DirectoryTracker.context_name()
+ return ast.Expr(
+ ast.Call(
+ func=ast.Attribute(
+ value=ast.Name(name, ctx=ast.Load()),
+ attr="pop",
+ ctx=ast.Load()
+ ),
+ args=[],
+ keywords=[]))
+++ /dev/null
-from lib.sandbox import Sandbox
-from lib.utils import join_path
-from .common import BuildEnvironment
-
-import os
-
-class LunaBuildFile(Sandbox):
- def __init__(self, env: BuildEnvironment, path) -> None:
- super().__init__({
- "_script":
- path,
- "sources":
- lambda src: self.export_sources(src),
- "headers":
- lambda hdr: self.export_headers(hdr),
- "configured":
- lambda name: self.check_config(name),
- "config":
- lambda name: self.read_config(name),
- "use":
- lambda file: self.import_buildfile(file),
- **env.external_func_table()
- })
-
- self.__srcs = []
- self.__hdrs = []
- self.__env = env
-
- self.__path = path
- self.__dir = os.path.dirname(path)
-
- def resolve(self):
- self.execute(self.__path)
- self.__env.add_sources(self.__srcs)
- self.__env.add_headers(self.__hdrs)
-
- def __do_process(self, list):
- resolved = []
- for entry in list:
- if not entry:
- continue
- resolved.append(self.__resolve_value(entry))
- return resolved
-
- def expand_select(self, val):
- tests = list(val.keys())
- if len(tests) != 1:
- raise TypeError(
- "select statement must have exactly one conditional")
-
- test = tests[0]
- outcomes = val[test]
- if test not in outcomes:
- self.__raise("unbounded select")
- return outcomes[test]
-
- def __resolve_value(self, source):
- resolved = source
- while isinstance(resolved, dict):
- if isinstance(resolved, dict):
- resolved = self.expand_select(resolved)
- else:
- self.__raise(f"entry with unknown type: {resolved}")
-
- if isinstance(resolved, list):
- rs = []
- for file in resolved:
- file = join_path(self.__dir, file)
- file = self.__env.to_wspath(file)
- rs.append(file)
- else:
- rs = join_path(self.__dir, resolved)
- rs = [self.__env.to_wspath(rs)]
-
- return rs
-
- def import_buildfile(self, path):
- path = self.__resolve_value(path)
- for p in path:
- if (os.path.isdir(p)):
- p = os.path.join(p, "LBuild")
-
- if not os.path.exists(p):
- self.__raise("Build file not exist: %s", p)
-
- if os.path.abspath(p) == os.path.abspath(self.__path):
- self.__raise("self dependency detected")
-
- LunaBuildFile(self.__env, p).resolve()
-
- def export_sources(self, src):
- src = self.__resolve_value(src)
- self.__srcs += src
-
- def export_headers(self, hdr):
- hdr = self.__resolve_value(hdr)
- self.__hdrs += hdr
-
- def check_config(self, name):
- return self.__env.config_provider().has_config(name)
-
- def read_config(self, name):
- return self.__env.config_provider().configured_value(name)
-
- def __raise(self, msg, *kargs):
- raise Exception(msg%kargs)
\ No newline at end of file
--- /dev/null
+import inspect
+from lib.utils import Schema
+from .common import DirectoryTracker
+
+class ScopeProvider:
+ Enumerables = Schema(Schema.Union(list, tuple))
+ Primitives = Schema(Schema.Union(str, int, bool))
+
+ def __init__(self, name):
+ super().__init__()
+ self.name = name
+ self.__accessors = {}
+
+ def __add_accessor(self, accessor, type):
+ acc = accessor
+ if isinstance(accessor, str):
+ acc = type(accessor)
+ self.__accessors[acc.name] = acc
+
+ def subscope(self, accessor):
+ self.__add_accessor(accessor, ScopeAccessor)
+
+ def file_subscope(self, accessor):
+ self.__add_accessor(accessor, FileScopeAccessor)
+
+ def accessors(self):
+ return self.__accessors.values()
+
+ def reset(self):
+ for v in self.__accessors.values():
+ v.reset()
+
+ def context_name(self):
+ return f"__SC{self.name}"
+
+ def __getitem__(self, name):
+ if name not in self.__accessors:
+ return None
+ return self.__accessors[name]
+
+ def __setitem__(self, name, val):
+ pass
+
+class ScopeAccessor:
+ def __init__(self, name):
+ super().__init__()
+ self.name = name
+ self.__values = []
+ self._active_context = None
+
+ def values(self):
+ return self.__values
+
+ def reset(self):
+ self.__values.clear()
+
+ def add(self, x):
+ self.__load_build_context()
+
+ if ScopeProvider.Enumerables == x:
+ for v in x:
+ self.add(v)
+ return
+
+ if ScopeProvider.Primitives == x:
+ self.put_value(x)
+ return
+
+ raise Exception(f"invalid value '{x}' of type ('{type(x).__name__}')")
+
+ def put_value(self, x):
+ self.__values.append(x)
+
+ def __load_build_context(self):
+ self._active_context = None
+ for f in inspect.stack():
+ if "__LBUILD__" not in f.frame.f_globals:
+ continue
+ self._active_context = f.frame.f_globals
+
+ if not self._active_context:
+ raise Exception(
+ f"unable to modify '{self.name}' outside build context)")
+
+ def __iadd__(self, other):
+ self.add(other)
+
+class FileScopeAccessor(ScopeAccessor):
+ def __init__(self, name):
+ super().__init__(name)
+
+ def put_value(self, x):
+ tracker = DirectoryTracker.get(self._active_context)
+ x = str(tracker.active_relative() / x)
+ return super().put_value(x)
+
+++ /dev/null
-from abc import abstractmethod
-
-class ConfigLoadBaseException(Exception):
- def __init__(self, msg, node, inner=None) -> None:
- super().__init__(msg)
-
- self.__msg = msg
- self.__node = node
- self.__inner = inner
-
- def __str__(self) -> str:
- return super().__str__()
-
-class ConfigLoadException(ConfigLoadBaseException):
- def __init__(self, msg, node = None, inner=None) -> None:
- super().__init__(msg, node, inner)
-
-class ConfigTypeCheckError(ConfigLoadBaseException):
- def __init__(self, msg, node, inner=None) -> None:
- super().__init__(msg, node, inner)
-
-class ConfigIOProvider:
- def __init__(self) -> None:
- pass
-
- @abstractmethod
- def export(self, env, config_dict):
- pass
-
- @abstractmethod
- def save(self, env, config_dict):
- pass
-
- @abstractmethod
- def load(self, env):
- pass
-
-
-class TypeProviderBase:
- def __init__(self, type) -> None:
- self._type = type
-
- @abstractmethod
- def check(self, val):
- return True
-
- @abstractmethod
- def serialise(self, val):
- pass
-
- @abstractmethod
- def deserialise(self, val):
- pass
-
- @abstractmethod
- def parse_input(self, input_str):
- pass
-
- @abstractmethod
- def to_input(self, val):
- pass
-
- def allow_none(self):
- return False
-
- @staticmethod
- def typedef_matched(typedef):
- return False
-
- @abstractmethod
- def __str__(self) -> str:
- pass
-
-class Renderable:
- def __init__(self) -> None:
- pass
-
- @abstractmethod
- def render(self, rctx):
- pass
-
-class RenderContext:
- def __init__(self) -> None:
- pass
-
- @abstractmethod
- def add_expandable(self, label, node, on_expand_cb):
- pass
-
- @abstractmethod
- def add_field(self, label, node):
- pass
-
-class InteractiveRenderer(RenderContext):
- def __init__(self) -> None:
- super().__init__()
-
- @abstractmethod
- def render_loop(self):
- pass
-
-
-class LConfigBuiltin:
- def __init__(self, f, is_contextual, rename=None, caller_type=[]):
- self.name = f.__name__ if not rename else rename
- self.__f = f
- self.__contextual = is_contextual
- self.__caller_type = caller_type
-
- def __call__(self, env, *args, **kwds):
- if not self.__contextual:
- return self.__f(env, *args, **kwds)
-
- caller = env.callframe_at(0)
- f_name = self.__f.__name__
-
- if not caller:
- raise ConfigLoadException(
- f"contextual function '{f_name}' can not be called contextless",
- None
- )
-
- if self.__caller_type and not any([isinstance(caller, x) for x in self.__caller_type]):
- raise ConfigLoadException(
- f"caller of '{f_name}' ({caller}) must have type of any {self.__caller_type}",
- caller
- )
-
- return self.__f(env, caller, *args, **kwds)
-
-def contextual(name=None, caller_type=[]):
- def wrapper(func):
- return LConfigBuiltin(func, True, name, caller_type)
- return wrapper
-
-def builtin(name=None):
- def wrapper(func):
- return LConfigBuiltin(func, False, name)
- return wrapper
+++ /dev/null
-from .api import contextual, builtin
-from .lcnodes import LCFuncNode, LCTermNode, LCModuleNode
-from lib.utils import join_path
-import os
-
-@contextual()
-def v(env, caller, term):
- node = env.lookup_node(term.__name__)
- env.dependency().add(node, caller)
- return env.resolve_symbol(node.get_name())
-
-@contextual(caller_type=[LCModuleNode])
-def include(env, caller, file):
- fobj = caller.get_fo()
- path = os.path.dirname(fobj.filename())
- path = join_path(path, file)
-
- if os.path.isdir(path):
- path = join_path(path, "LConfig")
-
- env.resolve_module(path)
-
-@contextual("type", caller_type=[LCTermNode])
-def term_type(env, caller, type):
- caller.set_type(type)
-
-@contextual("add_to_collection", caller_type=[LCFuncNode])
-def parent(env, caller, ref):
- sym = env.lookup_node(ref.__name__)
-
- caller.set_parent(sym)
-
-@contextual(caller_type=[LCTermNode])
-def default(env, caller, val):
- caller.set_default(val)
-
-@contextual(caller_type=[LCTermNode])
-def set_value(env, caller, val):
- caller.set_value(val)
-
-@builtin()
-def env(env, key, default=None):
- return os.getenv(key, default)
\ No newline at end of file
+++ /dev/null
-import os.path as path
-import ast, json
-
-from .lcnodes import LCModuleNode, LCTermNode
-from .api import (
- ConfigLoadException,
- Renderable
-)
-
-class LConfigFile:
- def __init__(self,
- env,
- file) -> None:
- self.__env = env
- self.__file = env.to_wspath(file)
-
- with open(file) as f:
- tree = ast.parse(f.read(), self.__file, mode='exec')
- self.__module = LCModuleNode(self, tree)
-
- def filename(self):
- return self.__file
-
- def env(self):
- return self.__env
-
- def root_module(self):
- return self.__module
-
- def compile_astns(self, astns):
- if not isinstance(astns, list):
- astns = [astns]
-
- return compile(
- ast.Module(body=astns, type_ignores=[]),
- self.__file, mode='exec')
-
-class DependencyGraph:
- def __init__(self) -> None:
- self._edges = {}
-
- def add(self, dependent, dependee):
- if dependent in self._edges:
- if dependee in self._edges[dependent]:
- return
- self._edges[dependent].add(dependee)
- else:
- self._edges[dependent] = set([dependee])
-
- if self.__check_loop(dependee):
- raise ConfigLoadException(
- f"loop dependency detected: {dependent.get_name()}",
- dependee)
-
- def __check_loop(self, start):
- visited = set()
- q = [start]
- while len(q) > 0:
- current = q.pop()
- if current in visited:
- return True
-
- visited.add(current)
- if current not in self._edges:
- continue
- for x in self._edges[current]:
- q.append(x)
-
- return False
-
- def cascade(self, start):
- q = [start]
- while len(q) > 0:
- current = q.pop()
- if current in self._edges:
- for x in self._edges[current]:
- q.append(x)
- if current != start:
- current.evaluate()
-
-class ConfigTypeFactory:
- def __init__(self) -> None:
- self.__providers = []
-
- def regitser(self, provider_type):
- self.__providers.append(provider_type)
-
- def create(self, typedef):
- for provider in self.__providers:
- if not provider.typedef_matched(typedef):
- continue
- return provider(typedef)
-
- raise ConfigLoadException(
- f"no type provider defined for type: {typedef}", None)
-
-class LConfigEvaluationWrapper:
- def __init__(self, env, node) -> None:
- self.__env = env
- self.__node = node
-
- def __enter__(self):
- self.__env.push_executing_node(self.__node)
- return self
-
- def __exit__(self, type, value, tb):
- self.__env.pop_executing_node()
-
- def evaluate(self):
- return self.__env.evaluate_node(self.__node)
-
-class LConfigEnvironment(Renderable):
- def __init__(self, workspace, config_io) -> None:
- super().__init__()
-
- self.__ws_path = path.abspath(workspace)
- self.__exec_globl = {}
- self.__eval_stack = []
- self.__lc_modules = []
- self.__config_val = {}
- self.__node_table = {}
- self.__deps_graph = DependencyGraph()
- self.__type_fatry = ConfigTypeFactory()
- self.__config_io = config_io
-
- def to_wspath(self, _path):
- _path = path.abspath(_path)
- return path.relpath(_path, self.__ws_path)
-
- def register_builtin_func(self, func_obj):
- call = (lambda *arg, **kwargs:
- func_obj(self, *arg, **kwargs))
-
- self.__exec_globl[func_obj.name] = call
-
- def resolve_module(self, file):
- fo = LConfigFile(self, file)
- self.__lc_modules.insert(0, (fo.root_module()))
-
- def evaluate_node(self, node):
- name = node.get_name()
-
- return self.get_symbol(name)()
-
- def eval_context(self, node):
- return LConfigEvaluationWrapper(self, node)
-
- def push_executing_node(self, node):
- self.__eval_stack.append(node)
-
- def pop_executing_node(self):
- return self.__eval_stack.pop()
-
- def register_node(self, node):
- self.__node_table[node.get_name()] = node
-
- l = {}
- try:
- self.push_executing_node(node)
- exec(node.get_co(), self.__exec_globl, l)
- self.pop_executing_node()
- except Exception as e:
- raise ConfigLoadException("failed to load", node, e)
-
- self.__exec_globl.update(l)
-
- def lookup_node(self, name):
- if name not in self.__node_table:
- raise ConfigLoadException(f"node '{name}' undefined")
- return self.__node_table[name]
-
- def type_factory(self):
- return self.__type_fatry
-
- def update_value(self, key, value):
- self.__config_val[key] = value
-
- def get_symbol(self, name):
- if name not in self.__exec_globl:
- raise ConfigLoadException(f"unknown symbol '{name}'")
- return self.__exec_globl[name]
-
- def callframe_at(self, traverse_level):
- try:
- return self.__eval_stack[traverse_level - 1]
- except:
- return None
-
- def lookup_value(self, key):
- return self.__config_val[key]
-
- def resolve_symbol(self, sym):
- term_node = self.__node_table[sym]
- if isinstance(term_node, LCTermNode):
- if not term_node.is_ready():
- term_node.evaluate()
- return term_node.get_value()
- raise Exception(f"fail to resolve symbol: {sym}, not resolvable")
-
- def dependency(self):
- return self.__deps_graph
-
- def update(self):
- for mod in self.__lc_modules:
- mod.evaluate()
-
- def export(self):
- self.__config_io.export(self, self.__config_val)
-
- def save(self, _path = ".config.json"):
- data = {}
- for mod in self.__lc_modules:
- mod.serialise(data)
-
- with open(_path, 'w') as f:
- json.dump(data, f)
-
- def load(self, _path = ".config.json"):
- if not path.exists(_path):
- return
-
- data = {}
- with open(_path, 'r') as f:
- data.update(json.load(f))
-
- for mod in self.__lc_modules:
- mod.deserialise(data)
-
- self.update()
-
- def render(self, rctx):
- for mod in self.__lc_modules:
- mod.render(rctx)
\ No newline at end of file
+++ /dev/null
-from .api import (
- ConfigLoadException,
- ConfigTypeCheckError,
- Renderable
-)
-
-from .utils import (
- extract_decorators,
- to_displayable
-)
-
-import ast, textwrap
-from abc import abstractmethod
-
-
-
-class LCNode(Renderable):
- def __init__(self, fo, astn):
- super().__init__()
-
- self._fo = fo
- self._env = fo.env()
- self._astn = self.prune_astn(astn)
- self._co = self.compile_astn()
- self._parent = None
-
- self._env.register_node(self)
-
- def prune_astn(self, astn):
- return astn
-
- def compile_astn(self):
- return self._fo.compile_astns(self._astn)
-
- def get_co(self):
- return self._co
-
- def get_fo(self):
- return self._fo
-
- def set_parent(self, new_parent):
- self._parent = new_parent
-
- def parent(self):
- return self._parent
-
- def add_child(self, node):
- pass
-
- def remove_child(self, node):
- pass
-
- def get_name(self):
- return f"LCNode: {self.__hash__()}"
-
- @abstractmethod
- def evaluate(self):
- pass
-
- def render(self, rctx):
- pass
-
- def deserialise(self, dict):
- pass
-
- def serialise(self, dict):
- pass
-
-
-
-class LCModuleNode(LCNode):
- def __init__(self, fo, astn: ast.Module):
- self.__nodes = {}
-
- super().__init__(fo, astn)
-
- def get_name(self):
- return f"file: {self._fo.filename()}"
-
- def prune_astn(self, astn: ast.Module):
- general_exprs = []
-
- for b in astn.body:
- if not isinstance(b, ast.FunctionDef):
- general_exprs.append(b)
- continue
-
- node = LCFuncNode.construct(self._fo, b)
- node.set_parent(self)
-
- self.add_child(node)
-
- return general_exprs
-
- def evaluate(self):
- with self._env.eval_context(self) as evalc:
- ls = list(self.__nodes.values())
- for node in ls:
- node.evaluate()
-
- def add_child(self, node):
- self.__nodes[node] = node
-
- def remove_child(self, node):
- if node in self.__nodes:
- del self.__nodes[node]
-
- def deserialise(self, dict):
- for node in self.__nodes:
- node.deserialise(dict)
-
- def serialise(self, dict):
- for node in self.__nodes:
- node.serialise(dict)
-
- def render(self, rctx):
- for node in self.__nodes:
- node.render(rctx)
-
-class LCFuncNode(LCNode):
- def __init__(self, fo, astn) -> None:
- self._decors = {}
- self._name = None
- self._help = ""
- self._display_name = None
- self._enabled = True
-
- super().__init__(fo, astn)
-
- def prune_astn(self, astn: ast.FunctionDef):
- self._name = astn.name
- self._display_name = to_displayable(self._name)
-
- maybe_doc = astn.body[0]
- if isinstance(maybe_doc, ast.Expr):
- if isinstance(maybe_doc.value, ast.Constant):
- self._help = maybe_doc.value.value
- self._help = textwrap.dedent(self._help)
-
- decors = extract_decorators(astn)
- for name, args, kwargs in decors:
- self._decors[name] = (args, kwargs)
-
- (args, _) = self._decors[self.mapped_name()]
- if args:
- self._display_name = args[0]
-
- astn.decorator_list.clear()
- return astn
-
- def get_name(self):
- return self._name
-
- def get_display_name(self):
- return self._display_name
-
- def enabled(self):
- if isinstance(self._parent, LCFuncNode):
- return self._enabled and self._parent.enabled()
- return self._enabled
-
- def help_prompt(self):
- return self._help
-
- def evaluate(self):
- with self._env.eval_context(self) as evalc:
- result = evalc.evaluate()
- self._enabled = True if result is None else result
-
- @staticmethod
- def mapped_name(self):
- return None
-
- @staticmethod
- def construct(fo, astn: ast.FunctionDef):
- nodes = [
- LCCollectionNode,
- LCGroupNode,
- LCTermNode
- ]
-
- for node in nodes:
- if extract_decorators(astn, node.mapped_name(), True):
- return node(fo, astn)
-
- raise ConfigLoadException(
- f"unknown type for astn type: {type(astn)}")
-
- def set_parent(self, new_parent):
- if self._parent:
- self._parent.remove_child(self)
-
- new_parent.add_child(self)
- super().set_parent(new_parent)
-
-
-class LCTermNode(LCFuncNode):
- def __init__(self, fo, astn) -> None:
- self._value = None
- self._default = None
- self._type = None
- self._rdonly = False
- self._ready = False
-
- super().__init__(fo, astn)
-
- @staticmethod
- def mapped_name():
- return "Term"
-
- def prune_astn(self, astn: ast.FunctionDef):
- astn = super().prune_astn(astn)
-
- self._rdonly = "ReadOnly" in self._decors
-
- return astn
-
- def set_type(self, type_def):
- self._type = self._env.type_factory().create(type_def)
-
- def get_type(self):
- return self._type
-
- def __assert_type(self, val):
- if not self._type:
- raise ConfigLoadException(
- f"config: {self._name} must be typed", self)
-
- if self._type.check(val):
- return
-
- raise ConfigTypeCheckError(
- f"value: {val} does not match type {self._type}", self)
-
- def set_value(self, val):
- if self._rdonly:
- return
-
- if isinstance(val, str):
- val = self._type.parse_input(val)
-
- self.__assert_type(val)
- self._value = val
-
- self._ready = True
- self.__update_value()
- self._env.dependency().cascade(self)
-
-
- def set_default(self, val):
- self.__assert_type(val)
- self._default = val
-
- if self._rdonly:
- self._value = val
-
- def get_value(self):
- return self._value
-
- def is_ready(self):
- return self._ready
-
- def evaluate(self):
- super().evaluate()
- self.__update_value()
-
- def read_only(self):
- return self._rdonly
-
- def render(self, rctx):
- if self.enabled():
- rctx.add_field(self._display_name, self)
-
- def serialise(self, dict):
- s_val = self._type.serialise(self._value)
- dict[self._name] = s_val
-
- def deserialise(self, dict):
- if self._name not in dict:
- return
-
- s_val = dict[self._name]
- v = self._type.deserialise(s_val)
- self.__assert_type(v)
- self._value = v
-
-
- def __update_value(self):
- v = self._value
-
- if not self.enabled():
- self._env.update_value(self._name, None)
- return
-
- if v is None:
- v = self._default
-
- if v is None and not self._type.allow_none():
- raise ConfigLoadException(
- f"config: {self._name} must have a value", self)
-
- self._value = v
- self._env.update_value(self._name, v)
-
-
-
-
-class LCGroupNode(LCFuncNode):
- def __init__(self, fo, astn) -> None:
- self._children = {}
-
- super().__init__(fo, astn)
-
- @staticmethod
- def mapped_name():
- return "Group"
-
- def prune_astn(self, astn: ast.FunctionDef):
- astn = super().prune_astn(astn)
-
- other_exprs = []
- for expr in astn.body:
- if not isinstance(expr, ast.FunctionDef):
- other_exprs.append(expr)
- continue
-
- node = LCFuncNode.construct(self._fo, expr)
- node.set_parent(self)
- self._children[node] = node
-
- if not other_exprs:
- other_exprs.append(
- ast.Pass(
- lineno=astn.lineno + 1,
- col_offset=astn.col_offset
- )
- )
-
- astn.body = other_exprs
- return astn
-
- def evaluate(self):
- old_enable = super().enabled()
- super().evaluate()
-
- new_enabled = super().enabled()
- if not new_enabled and old_enable == new_enabled:
- return
-
- with self._env.eval_context(self) as evalc:
- children = list(self._children.values())
- for child in children:
- child.evaluate()
-
- def render(self, rctx):
- for child in self._children.values():
- child.render(rctx)
-
- def serialise(self, dict):
- sub_dict = {}
- for child in self._children.values():
- child.serialise(sub_dict)
-
- dict[self._name] = sub_dict
-
- def deserialise(self, dict):
- if self._name not in dict:
- return
-
- sub_dict = dict[self._name]
- for child in self._children.values():
- child.deserialise(sub_dict)
-
- def add_child(self, node):
- self._children[node] = node
-
- def remove_child(self, node):
- if node in self._children:
- del self._children[node]
-
-
-class LCCollectionNode(LCGroupNode):
- def __init__(self, fo, astn) -> None:
- super().__init__(fo, astn)
-
- @staticmethod
- def mapped_name():
- return "Collection"
-
- def render(self, rctx):
- _super = super()
- rctx.add_expandable(
- self._display_name,
- self,
- lambda _ctx:
- _super.render(_ctx)
- )
\ No newline at end of file
+++ /dev/null
-from .api import TypeProviderBase
-from .utils import is_primitive, is_basic
-
-class PrimitiveType(TypeProviderBase):
- def __init__(self, type) -> None:
- super().__init__(type)
-
- @staticmethod
- def typedef_matched(typedef):
- return is_primitive(typedef)
-
- def check(self, val):
- return isinstance(val, self._type)
-
- def serialise(self, val):
- return str(val)
-
- def deserialise(self, val):
- if val.lower() == "true":
- return True
- elif val.lower() == "false":
- return False
-
- return self._type(val)
-
- def parse_input(self, input_str):
- return self.deserialise(input_str)
-
- def to_input(self, val):
- return self.serialise(val)
-
- def __str__(self) -> str:
- if isinstance(self._type, type):
- return f"any with type of {self._type}"
- return f"exact of value '{self._type}'"
-
-
-class MultipleChoiceType(PrimitiveType):
- def __init__(self, type) -> None:
- super().__init__(type)
-
- @staticmethod
- def typedef_matched(typedef):
- if not isinstance(typedef, list):
- return False
- return all([is_basic(x) for x in typedef])
-
- def check(self, val):
- if not is_basic(val):
- return False
- return val in self._type
-
- def parse_input(self, input_str):
- return super().parse_input(input_str)
-
- def deserialise(self, val):
- if val.lower() == "true":
- return True
- elif val.lower() == "false":
- return False
-
- for sv in self._type:
- if val != str(sv):
- continue
- return type(sv)(val)
-
- return None
-
- def allow_none(self):
- return None in self._type
-
- def __str__(self) -> str:
- accepted = [f" * {t}" for t in self._type]
- return "\n".join([
- "choose one:",
- *accepted
- ])
+++ /dev/null
-import ast
-
-def extract_decorators(fn_ast: ast.FunctionDef, fname = None, only_name=False):
- decors = fn_ast.decorator_list
- results = []
- for decor in decors:
- _args = []
- _kwargs = []
- if isinstance(decor, ast.Name):
- name = decor.id
- elif isinstance(decor, ast.Call):
- if not isinstance(decor.func, ast.Name):
- continue
- name = decor.func.id
- _args = decor.args
- _kwargs = decor.keywords
- else:
- continue
-
- if fname and name != fname:
- continue
-
- if only_name:
- results.append(name)
- continue
-
- unpacked = []
- kwargs = {}
- for arg in _args:
- if isinstance(arg, ast.Constant):
- unpacked.append(arg.value)
-
- for kwarg in _kwargs:
- if isinstance(kwarg.value, ast.Constant):
- kwargs[kwarg.arg] = kwarg.value.value
-
- results.append((name, unpacked, kwargs))
-
- return results
-
-def to_displayable(name):
- l = name.strip().split('_')
- for i, s in enumerate(l):
- l[i] = str.upper(s[0]) + s[1:]
- return " ".join(l)
-
-def is_primitive(val):
- return val in [int, str, bool]
-
-def is_basic(val):
- basic = [int, str, bool]
- return (
- val in basic or
- any([isinstance(val, x) for x in basic])
- )
\ No newline at end of file
--- /dev/null
+import ast
+
+from lib.utils import ConfigAST, ConfigASTVisitor
+from .common import NodeProperty, ConfigNodeError, ValueTypeConstrain
+from .nodes import GroupNode, TermNode
+from .sanitiser import TreeSanitiser
+
+class NodeBuilder(ConfigASTVisitor):
+ def __init__(self, env):
+ super().__init__()
+
+ self.__env = env
+ self.__level = []
+ self.__noncfg_astns = []
+
+ def __pop_and_merge(self):
+ if len(self.__level) == 0:
+ return
+
+ if len(self.__level) == 1:
+ self.__level.pop()
+ return
+
+ node = self.__level.pop()
+ prev = self.__level[-1]
+
+ assert isinstance(prev, GroupNode)
+
+ prev.add_child(node)
+
+ def __push(self, cfg_node):
+ self.__level.append(cfg_node)
+
+ def __check_literal_expr(self, node):
+ return (
+ isinstance(node, ast.Expr)
+ and isinstance(node.value, ast.Constant)
+ and isinstance(node.value.value, str)
+ )
+
+ def _visit_fndef(self, node):
+ if hasattr(node, "__builtin"):
+ self.__noncfg_astns.append(node)
+ return
+
+ cfgn_type = TermNode
+ if not node.returns:
+ cfgn_type = GroupNode
+
+ cfgn = cfgn_type(self.__env, self.current_file(), node.name)
+
+ self.__push(cfgn)
+
+ try:
+ for decor in node.decorator_list:
+ cfgn.apply_decorator(decor)
+
+ astns = []
+ help_text = ""
+ for sub in node.body:
+ if isinstance(sub, ast.FunctionDef):
+ self._visit_fndef(sub)
+ continue
+
+ if self.__check_literal_expr(sub):
+ help_text += sub.value.value
+ continue
+
+ astns.append(sub)
+
+ NodeProperty.Token[cfgn] = node
+ NodeProperty.HelpText[cfgn] = help_text
+
+ if cfgn_type is TermNode:
+ NodeProperty.Type[cfgn] = ValueTypeConstrain(cfgn, node.returns)
+
+ astns.append(ast.Pass())
+ cfgn.set_node_body(astns)
+
+ self.__env.register_node(cfgn)
+
+ except ConfigNodeError as e:
+ raise e
+ except Exception as e:
+ msg = e.args[0] if len(e.args) > 0 else type(e).__name__
+ raise cfgn.config_error(msg)
+
+ self.__pop_and_merge()
+
+ def visit(self, node):
+ super().visit(node)
+
+ if isinstance(node, ast.Module):
+ return
+
+ if not isinstance(node, ast.FunctionDef):
+ self.__noncfg_astns.append(node)
+
+ @staticmethod
+ def build(env, rootfile):
+ build = NodeBuilder(env)
+ ast = ConfigAST(rootfile)
+
+ env.reset()
+ ast.visit(TreeSanitiser())
+ ast.visit(build)
+
+ env.set_exec_context(build.__noncfg_astns)
+ env.relocate_children()
\ No newline at end of file
--- /dev/null
+import ast
+
+from lib.utils import SourceLogger, Schema
+
+class NodeProperty:
+ class PropertyAccessor:
+ def __init__(self, key):
+ self.__key = key
+ def __getitem__(self, node):
+ return node.get_property(self.__key)
+ def __setitem__(self, node, value):
+ node.set_property(self.__key, value)
+ def __delitem__(self, node):
+ node.set_property(self.__key, None)
+
+ Token = PropertyAccessor("$token")
+ Value = PropertyAccessor("$value")
+ Type = PropertyAccessor("$type")
+ Enabled = PropertyAccessor("$enabled")
+ Status = PropertyAccessor("$status")
+ Dependency = PropertyAccessor("$depends")
+ Hidden = PropertyAccessor("hidden")
+ Parent = PropertyAccessor("parent")
+ Label = PropertyAccessor("label")
+ Readonly = PropertyAccessor("readonly")
+ HelpText = PropertyAccessor("help")
+
+class ConfigNodeError(Exception):
+ def __init__(self, node, *args):
+ super().__init__(*args)
+
+ self.__node = node
+ self.__msg = " ".join([str(x) for x in args])
+
+ def __str__(self):
+ node = self.__node
+ tok: ast.stmt = NodeProperty.Token[node]
+ return (
+ f"{node._filename}:{tok.lineno}:{tok.col_offset}:" +
+ f" fatal error: {node._name}: {self.__msg}"
+ )
+
+
+class ValueTypeConstrain:
+ TypeMap = {
+ "str": str,
+ "int": int,
+ "bool": bool,
+ }
+
+ BinOpOr = Schema(ast.BinOp, op=ast.BitOr)
+
+ def __init__(self, node, rettype):
+ self.__node = node
+ self.__raw = rettype
+
+ if isinstance(rettype, ast.Expr):
+ value = rettype.value
+ else:
+ value = rettype
+
+ if isinstance(value, ast.Constant):
+ self.schema = Schema(value.value)
+
+ elif isinstance(value, ast.Name):
+ self.schema = Schema(self.__parse_type(value.id))
+
+ elif isinstance(value, ast.BinOp):
+ unions = self.__parse_binop(value)
+ self.schema = Schema(Schema.Union(*unions))
+
+ else:
+ raise Exception(
+ f"unsupported type definition: {ast.unparse(rettype)}")
+
+ def __parse_type(self, type):
+ if type not in ValueTypeConstrain.TypeMap:
+ raise Exception(f"unknown type: {type}")
+
+ return ValueTypeConstrain.TypeMap[type]
+
+ def __parse_binop(self, oproot):
+ if isinstance(oproot, ast.Constant):
+ return [oproot.value]
+
+ if ValueTypeConstrain.BinOpOr != oproot:
+ SourceLogger.warn(
+ self.__node, self.__raw,
+ "only OR is allowed. Ignoring...")
+ return []
+
+ return self.__parse_binop(oproot.left) \
+ + self.__parse_binop(oproot.right)
+
+ def check_type(self, value):
+ return self.schema.match(value)
+
+ def ensure_type(self, node, val):
+ if self.check_type(val):
+ return
+
+ raise node.config_error(
+ f"unmatched type:",
+ f"expect: '{self.schema}',",
+ f"got: '{val}' ({type(val)})")
+
+class NodeDependency:
+ class SimpleWalker(ast.NodeVisitor):
+ def __init__(self, deps):
+ super().__init__()
+ self.__deps = deps
+
+ def visit_Name(self, node):
+ self.__deps._names.append(node.id)
+
+ def __init__(self, nodes, expr: ast.expr):
+ self._names = []
+ self._expr = ast.unparse(expr)
+
+ expr = ast.fix_missing_locations(expr)
+ self.__exec = compile(ast.Expression(expr), "", mode='eval')
+
+ NodeDependency.SimpleWalker(self).visit(expr)
+
+ def evaluate(self, value_tables) -> bool:
+ return eval(self.__exec, value_tables)
+
+ @staticmethod
+ def try_create(node):
+ expr = NodeProperty.Dependency[node]
+ if not isinstance(expr, ast.expr):
+ return
+
+ dep = NodeDependency(node, expr)
+ NodeProperty.Dependency[node] = dep
\ No newline at end of file
--- /dev/null
+import ast, os
+
+from lib.utils import ConfigAST
+from .common import NodeProperty, NodeDependency
+from .nodes import GroupNode, TermNode
+
+
+class ConfigEnvironment:
+ def __init__(self):
+ self.__node_table = {}
+ self.__exec = None
+ self.__globals = {}
+ self.__custom_fn = {
+ "env": lambda x: os.environ.get(x)
+ }
+
+ def check_dependency(self, node) -> bool:
+ dep = NodeProperty.Dependency[node]
+
+ if not isinstance(dep, NodeDependency):
+ return node.enabled()
+
+ value_map = {}
+ for name in dep._names:
+ if name not in self.__node_table:
+ raise node.config_error(f"config: '{name}' does not exists")
+
+ n = self.__node_table[name]
+ value_map[name] = self.check_dependency(n)
+
+ return node.enabled() and dep.evaluate(value_map)
+
+ def register_node(self, cfg_node):
+ name = cfg_node._name
+ if name in self.__node_table:
+ raise cfg_node.config_error(f"redefinition of '{name}'")
+
+ self.__node_table[name] = cfg_node
+
+ def get_node(self, name):
+ return self.__node_table.get(name)
+
+ def relocate_children(self):
+ for node in self.__node_table.values():
+ parent = NodeProperty.Parent[node]
+ if not parent:
+ continue
+
+ if not isinstance(parent, str):
+ continue
+
+ if parent not in self.__node_table:
+ raise Exception(node, "unknow parent: %s"%(parent))
+
+ parent_node = self.__node_table[parent]
+ if not isinstance(parent_node, GroupNode):
+ raise Exception(node, "not a valid parent: %s"%(parent))
+
+ parent_node.add_child(node)
+
+ def set_exec_context(self, astns):
+ filtered = []
+ for x in astns:
+ if isinstance(x, ConfigAST.EnterFileMarker):
+ continue
+ if isinstance(x, ConfigAST.LeaveFileMarker):
+ continue
+ filtered.append(x)
+
+ module = ast.Module(filtered, type_ignores=[])
+ module = ast.fix_missing_locations(module)
+
+ self.__exec = compile(module, "", mode='exec')
+
+ def __update_globals(self):
+ g = {}
+
+ exec(self.__exec, g)
+ self.__globals = {
+ **g,
+ **self.__custom_fn
+ }
+
+ def context(self):
+ return self.__globals
+
+ def refresh(self):
+ self.__update_globals()
+
+ for name, node in self.__node_table.items():
+ node.update()
+
+ for name, node in self.__node_table.items():
+ node.sanity_check()
+ NodeProperty.Enabled[node] = self.check_dependency(node)
+
+ def reset(self):
+ self.__node_table.clear()
+
+ def nodes(self):
+ for node in self.__node_table.values():
+ yield node
+
+ def top_levels(self):
+ for node in self.__node_table.values():
+ if NodeProperty.Parent[node] is None:
+ yield node
+
+ def terms(self):
+ for node in self.__node_table.values():
+ if isinstance(node, TermNode):
+ yield node
+
+import json
\ No newline at end of file
--- /dev/null
+import ast
+
+from .common import NodeProperty
+from lib.utils import Schema, SourceLogger
+
+class LazyLookup:
+ def __init__(self):
+ self.__tab = {}
+
+ def put(self, lazy):
+ self.__tab[lazy.get_key()] = lazy
+
+ def get(self, key):
+ return self.__tab[key] if key in self.__tab else None
+
+ def __setitem__(self, key, val):
+ lz = self.__tab[key]
+ lz.resolve_set(val)
+
+ def __getitem__(self, key):
+ lz = self.__tab[key]
+ return lz.resolve_get()
+
+class Lazy:
+ NodeValue = 'val'
+
+ LazyTypes = Schema.Union(NodeValue)
+ Syntax = Schema(ast.Attribute, attr=LazyTypes, value=ast.Name)
+
+ def __init__(self, source, type, target, env):
+ self.target = target
+ self.type = type
+ self.env = env
+ self.source = source
+
+ def __resolve_type(self):
+ if self.type == Lazy.NodeValue:
+ return NodeProperty.Value
+ return None
+
+ def resolve_get(self):
+ node = self.env.get_node(self.target)
+
+ accessor = self.__resolve_type()
+ if not accessor:
+ return None
+
+ status = NodeProperty.Status[node]
+ if status == "Updating":
+ tok = NodeProperty.Token[self.source]
+ SourceLogger.warn(self.source, tok,
+ f"cyclic dependency detected: {self.source._name} <-> {self.target}." +
+ f"Reusing cached value, maybe staled.")
+ else:
+ node.update()
+
+ return accessor[node] if accessor else None
+
+ def resolve_set(self, val):
+ node = self.env.get_node(self.target)
+ accessor = self.__resolve_type()
+
+ if NodeProperty.Readonly[node]:
+ raise self.source.config_error(
+ f"{self.target} is readonly")
+
+ if accessor:
+ accessor[node] = val
+ else:
+ raise self.source.config_error(
+ f"invalid type {self.type} for {self.target}")
+
+ def get_key(self):
+ return Lazy.get_key_from(self.type, self.target)
+
+ @staticmethod
+ def get_key_from(type, target):
+ return f"{type}${target}"
+
+ @staticmethod
+ def from_astn(cfgnode, astn):
+ if Lazy.Syntax != astn:
+ return None
+
+ type_ = astn.attr
+ target = astn.value.id
+ key = Lazy.get_key_from(type_, target)
+
+ lz = cfgnode._lazy_table.get(key)
+ if lz:
+ return key
+
+ lz = Lazy(cfgnode, type_, target, cfgnode._env)
+ cfgnode._lazy_table.put(lz)
+
+ return key
--- /dev/null
+import ast
+
+from lib.utils import SourceLogger, Schema
+from .common import NodeProperty, ConfigNodeError, NodeDependency
+from .lazy import LazyLookup
+from .rewriter import ConfigNodeASTRewriter
+
+
+class ConfigDecorator:
+ Label = Schema(ast.Constant)
+ Readonly = Schema(ast.Name, id="readonly")
+ Hidden = Schema(ast.Name, id="hidden")
+ Flag = Schema(ast.Name, id="flag")
+ SetParent = Schema(
+ ast.NamedExpr,
+ target=Schema(ast.Name, id="parent"),
+ value=Schema(ast.Name))
+ SetProperty = Schema(
+ ast.NamedExpr,
+ target=Schema(ast.Name),
+ value=Schema(ast.Name))
+
+
+class ConfigNode:
+ def __init__(self, env, filename, name):
+ self.__props = {}
+ self.__exec = None
+ self._env = env
+ self._filename = filename
+ self._lazy_table = LazyLookup()
+
+ self._name = name
+
+ NodeProperty.Enabled[self] = True
+ NodeProperty.Hidden[self] = False
+ NodeProperty.Readonly[self] = False
+ NodeProperty.Status[self] = "Empty"
+
+ def set_node_body(self, ast_nodes, rewriter = ConfigNodeASTRewriter):
+ new_ast = rewriter(self).visit(ast.Module(ast_nodes))
+ NodeDependency.try_create(self)
+
+ fn_name = f"__fn_{self._name}"
+ args = ast.arguments([], [], None, [], [], None, [])
+ module = ast.Module([
+ ast.FunctionDef(fn_name, args, new_ast.body, []),
+ ast.Assign(
+ [ast.Name("_result", ctx=ast.Store())],
+ ast.Call(ast.Name(fn_name,ctx=ast.Load()), [], [])
+ )
+ ], [])
+
+ module = ast.fix_missing_locations(module)
+ self.__exec = compile(module, self._filename, mode='exec')
+
+
+ def apply_decorator(self, decor):
+ if ConfigDecorator.Label == decor:
+ NodeProperty.Label[self] = str(decor.value)
+
+ elif ConfigDecorator.Readonly == decor:
+ NodeProperty.Readonly[self] = True
+
+ elif ConfigDecorator.Hidden == decor:
+ NodeProperty.Hidden[self] = True
+
+ elif ConfigDecorator.SetParent == decor:
+ NodeProperty.Parent[self] = decor.value.id
+
+ elif ConfigDecorator.Flag == decor:
+ NodeProperty.Hidden[self] = True
+ NodeProperty.Readonly[self] = True
+
+ elif ConfigDecorator.SetProperty == decor:
+ self.set_property(decor.target.id, decor.value.id)
+
+ else:
+ fname = self._filename
+ line = decor.lineno
+ col = decor.col_offset
+ msg = f"unknown decorator: {ast.unparse(decor)}"
+ msg = SourceLogger.fmt_warning(fname, line, col, msg)
+ print(msg)
+
+ def update(self):
+ self.update_status("Updating")
+
+ try:
+ env = self._env
+ local = {}
+ globl = {
+ **env.context(),
+ "__lzLut__": self._lazy_table
+ }
+
+ exec(self.__exec, globl, local)
+ self.update_status("Latest")
+
+ return local["_result"]
+
+ except ConfigNodeError as e:
+ self.update_status("Error")
+ raise e
+
+ except Exception as e:
+ self.update_status("Error")
+ raise self.config_error(e)
+
+ def sanity_check(self):
+ pass
+
+ def get_property(self, key):
+ return self.__props[key] if key in self.__props else None
+
+ def set_property(self, key, value):
+ if value is None:
+ del self.__props[key]
+ else:
+ self.__props[key] = value
+
+ def enabled(self):
+ val = NodeProperty.Value[self]
+ en = NodeProperty.Enabled[self]
+ parent = NodeProperty.Parent[self]
+
+ if isinstance(val, bool):
+ en = en and val
+
+ if isinstance(parent, ConfigNode):
+ en = en and parent.enabled()
+
+ return en
+
+ def config_error(self, *args):
+ return ConfigNodeError(self, *args)
+
+ def update_status(self, status):
+ NodeProperty.Status[self] = status
+
+
+class GroupNode(ConfigNode):
+ def __init__(self, env, filename, name):
+ super().__init__(env, filename, name)
+
+ self._subnodes = {}
+
+ def add_child(self, node):
+ if node._name in self._subnodes:
+ return
+
+ NodeProperty.Parent[node] = self
+ self._subnodes[node._name] = node
+
+
+class TermNode(ConfigNode):
+ def __init__(self, env, filename, name):
+ super().__init__(env, filename, name)
+
+ def update(self):
+ result = super().update()
+
+ if NodeProperty.Readonly[self]:
+ NodeProperty.Value[self] = result
+
+ elif NodeProperty.Value[self] is None:
+ NodeProperty.Value[self] = result
+
+ return result
+
+ def sanity_check(self):
+ val = NodeProperty.Value[self]
+ value_typer = NodeProperty.Type[self]
+ value_typer.ensure_type(self, val)
+
+ super().sanity_check()
+
--- /dev/null
+import ast
+
+from lib.utils import Schema
+from .lazy import Lazy
+from .common import NodeProperty
+
+class ConfigNodeASTRewriter(ast.NodeTransformer):
+ Depend = Schema(
+ ast.Call,
+ func=Schema(ast.Name, id='require'),
+ args=[ast.expr])
+
+ def __init__(self, cfg_node):
+ super().__init__()
+
+ self.__cfg_node = cfg_node
+
+ def __gen_accessor(self, orig):
+ key = Lazy.from_astn(self.__cfg_node, orig)
+ if not key:
+ return self.generic_visit(orig)
+
+ return ast.Subscript(
+ value=ast.Name("__lzLut__", ctx=ast.Load()),
+ slice=ast.Constant(key),
+ ctx=orig.ctx,
+ lineno=orig.lineno,
+ col_offset=orig.col_offset
+ )
+
+ def __gen_dependency(self, node):
+ cfgn = self.__cfg_node
+ dep_expr = NodeProperty.Dependency[cfgn]
+ if dep_expr is None:
+ NodeProperty.Dependency[cfgn] = node
+ return
+
+ if not isinstance(dep_expr, ast.expr):
+ raise cfgn.config_error(
+ f"invalid dependency state: {dep_expr}")
+
+ dep_expr = ast.BoolOp(ast.And(), [dep_expr, node])
+ NodeProperty.Dependency[cfgn] = dep_expr
+
+ def visit_Attribute(self, node):
+ return self.__gen_accessor(node)
+
+ def visit_Expr(self, node):
+ val = node.value
+
+ if ConfigNodeASTRewriter.Depend != val:
+ return self.generic_visit(node)
+
+ # Process marker functions
+ name = val.func.id
+ if name == "require":
+ self.__gen_dependency(val.args[0])
+ else:
+ return self.generic_visit(node)
+
+ return None
--- /dev/null
+import ast
+
+from lib.utils import Schema, ConfigASTVisitor, SourceLogger
+from .common import NodeProperty
+
+class TreeSanitiser(ConfigASTVisitor):
+ DecoNative = Schema(ast.Name, id="native")
+ DecoName = Schema(ast.Name)
+ DecoNameE = Schema(ast.NamedExpr)
+ DecoCall = Schema(ast.Call)
+ DecoConst = Schema(ast.Constant)
+
+ def __init__(self):
+ super().__init__()
+ self.logger = SourceLogger(self)
+
+ # TODO
+ self.deco_rules = {}
+
+ def __sanitise_decorator(self, node: ast.FunctionDef):
+ deco_map = {}
+
+ for deco in node.decorator_list:
+ name = ""
+ if TreeSanitiser.DecoNative == deco:
+ setattr(node, "__builtin", True)
+ continue
+ elif TreeSanitiser.DecoCall == deco:
+ name = deco.func
+ elif TreeSanitiser.DecoConst == deco:
+ name = f"{deco.value}"
+ elif TreeSanitiser.DecoName == deco:
+ name = f"{deco.id}"
+ elif TreeSanitiser.DecoNameE == deco:
+ name = f"{ast.unparse(deco)}"
+ else:
+ self.logger.warn(deco, "invalid modifier type")
+ continue
+
+ deco_map[name] = deco
+
+ node.decorator_list = [x for x in deco_map.values()]
+
+ for desc, rule in self.deco_rules.items():
+ if rule(deco_map):
+ continue
+ self.logger.warn(node, desc)
+
+ def _visit_fndef(self, node):
+ self.__sanitise_decorator(node)
+ super()._visit_fndef(node)
+++ /dev/null
-import traceback, sys
-
-class InterpreterException(Exception):
- pass
-
-class Sandbox:
- def __init__(self, symbols) -> None:
- self.__syms = globals()
- self.__syms.update(symbols)
-
- def execute(self, file):
- with open(file) as f:
- return self.executes(f.read(), file)
-
- def executes(self, str, file=""):
- try:
- local_ctx = {}
- glb_ctx = self.__syms.copy()
- exec(str, glb_ctx, local_ctx)
- return local_ctx
- except SyntaxError as err:
- error_class = err.__class__.__name__
- detail = err.args[0]
- line_number = err.lineno
- except Exception as err:
- error_class = err.__class__.__name__
- detail = err.args[0]
- cl, exc, tb = sys.exc_info()
- line_number = traceback.extract_tb(tb)[1][1]
-
- print(f"LunaBuild failed: {error_class} at ./{file}:{line_number}, {detail}")
- raise InterpreterException("load error")
\ No newline at end of file
-import os
+import os, ast
+from pathlib import Path
def join_path(stem, path):
if os.path.isabs(path):
return path
- return os.path.join(stem, path)
\ No newline at end of file
+ return os.path.join(stem, path)
+
+
+class Schema:
+ class Empty:
+ def __init__(self):
+ pass
+
+ class Union:
+ def __init__(self, *args):
+ self.union = [*args]
+
+ def __str__(self):
+ strs = [Schema.get_type_str(t) for t in self.union]
+ return f"{' | '.join(strs)}"
+
+ def __init__(self, type, **kwargs):
+ self.__type = type
+ self.__fields = kwargs
+
+ def __match_list(self, actual, expect):
+ if len(actual) != len(expect):
+ return False
+
+ for a, b in zip(actual, expect):
+ if not self.__match(a, b):
+ return False
+
+ return True
+
+ def __match_union(self, actual, union):
+ for s in union.union:
+ if self.__match(actual, s):
+ return True
+ return False
+
+ def __match(self, val, scheme):
+ if isinstance(scheme, Schema):
+ return scheme.match(val)
+
+ if isinstance(scheme, list) and isinstance(val, list):
+ return self.__match_list(val, scheme)
+
+ if isinstance(scheme, Schema.Union):
+ return self.__match_union(val, scheme)
+
+ if not isinstance(scheme, type):
+ return scheme == val
+
+ return isinstance(val, scheme)
+
+ def match(self, instance):
+ if not self.__match(instance, self.__type):
+ return False
+
+ for field, t in self.__fields.items():
+ if not hasattr(instance, field):
+ if isinstance(t, Schema.Empty):
+ continue
+ return False
+
+ field_val = getattr(instance, field)
+
+ if not self.__match(field_val, t):
+ return False
+
+ return True
+
+ def __eq__(self, value):
+ if isinstance(value, Schema):
+ return super().__eq__(value)
+ return self.match(value)
+
+ def __ne__(self, value):
+ return not self.__eq__(value)
+
+ def __str__(self):
+ fields = ""
+ if len(self.__fields) > 0:
+ fields = ", ".join([
+ f"{name} :: {Schema.get_type_str(t)}"
+ for name, t in self.__fields.items()])
+ fields = "{%s}"%(fields)
+
+ return f"{Schema.get_type_str(self.__type)} {fields}"
+
+ def __repr__(self):
+ return self.__str__()
+
+ @staticmethod
+ def get_type_str(maybe_type):
+ if isinstance(maybe_type, type):
+ return maybe_type.__name__
+ if isinstance(maybe_type, str):
+ return f'"{maybe_type}"'
+ return str(maybe_type)
+
+class SourceLogger:
+ def __init__(self, visitor):
+ self.__visitor = visitor
+
+ def warn(self, node, fmt, *args):
+ fname = self.__visitor.current_file()
+ line = node.lineno
+ coln = node.col_offset
+
+ print(SourceLogger.fmt_warning(fname, line, coln, fmt%args))
+
+ @staticmethod
+ def fmt_message(fname, line, col, level, msg):
+ return "%s:%d:%d: %s: %s"%(fname, line, col, level, msg)
+
+ @staticmethod
+ def fmt_warning(fname, line, col, msg):
+ return SourceLogger.fmt_message(fname, line, col, "warning", msg)
+
+ @staticmethod
+ def fmt_info(fname, line, col, msg):
+ return SourceLogger.fmt_message(fname, line, col, "info", msg)
+
+ @staticmethod
+ def fmt_error(fname, line, col, msg):
+ return SourceLogger.fmt_message(fname, line, col, "error", msg)
+
+ @staticmethod
+ def log(level, node, token, msg):
+ fname = node._filename
+ line = token.lineno
+ col = token.col_offset
+
+ print( SourceLogger.fmt_message(fname, line, col, level, msg))
+ print(f" at... {ast.unparse(token)}")
+
+ @staticmethod
+ def warn(node, token, msg):
+ return SourceLogger.log("warning", node, token, msg)
+
+class ConfigAST:
+ ConfigImport = Schema(ast.ImportFrom, level=1)
+
+ class EnterFileMarker:
+ def __init__(self, filename = None):
+ self.name = filename
+
+ class LeaveFileMarker:
+ def __init__(self):
+ pass
+
+ def __init__(self, root_file, cfg_name = "LConfig"):
+ self.__tree = ast.Module([])
+ self.__cfg_name = cfg_name
+ self.__rootfile = Path(root_file)
+
+ self.__load(self.__rootfile)
+
+ def __load(self, file: Path):
+ parent = file.parent
+
+ with file.open('r') as f:
+ nodes = ast.parse(f.read()).body
+
+ relfile = str(file.relative_to(self.__rootfile.parent))
+ marker = ConfigAST.EnterFileMarker(relfile)
+ self.__append_tree(marker)
+
+ for node in nodes:
+ if ConfigAST.ConfigImport != node:
+ self.__append_tree(node)
+ continue
+
+ module = node.module
+ module = "" if not module else module
+ path = parent.joinpath(*module.split('.'))
+ for alia in node.names:
+ p = path / alia.name / self.__cfg_name
+ self.__load(p)
+
+ self.__append_tree(ConfigAST.LeaveFileMarker())
+
+ def __append_tree(self, node):
+ self.__tree.body.append(node)
+
+ def visit(self, visitor):
+ visitor.visit(self.__tree)
+
+class ConfigASTVisitor:
+ def __init__(self):
+ self.__markers = []
+
+ def _visit_fndef(self, node : ast.FunctionDef):
+ self._visit_subtree(node)
+
+ def _visit_leaf(self, node):
+ pass
+
+ def _visit_subtree(self, node):
+ for n in node.body:
+ self.visit(n)
+
+ def _visit_expr(self, node : ast.Expr):
+ pass
+
+ def current_file(self):
+ if len(self.__markers) == 0:
+ return "<root>"
+ return self.__markers[-1].name
+
+ def visit(self, node):
+ if isinstance(node, ConfigAST.EnterFileMarker):
+ self.__markers.append(node)
+ return
+
+ if isinstance(node, ConfigAST.LeaveFileMarker):
+ self.__markers.pop()
+ return
+
+ if isinstance(node, ast.FunctionDef):
+ self._visit_fndef(node)
+
+ elif isinstance(node, ast.Expr):
+ self._visit_expr(node)
+
+ elif hasattr(node, "body"):
+ self._visit_subtree(node)
+
+ else:
+ self._visit_leaf(node)
#!/usr/bin/env python
-
-from lbuild.contract import LunaBuildFile
-from lbuild.common import BuildEnvironment
-
-from lcfg.common import LConfigEnvironment
-from integration.config_io import CHeaderConfigProvider
-from integration.lbuild_bridge import LConfigProvider
-from integration.render_ishell import InteractiveShell
-from integration.build_gen import MakefileBuildGen, install_lbuild_functions
-from integration.lunamenu import menuconfig, TerminalSizeCheckFailed
-
-import lcfg.types as lcfg_type
-import lcfg.builtins as builtin
-
-from os import getcwd
-from os import mkdir
-from os.path import abspath, basename, dirname, exists
-from argparse import ArgumentParser
-from lib.utils import join_path
-
-def prepare_lconfig_env(out_dir):
- provider = CHeaderConfigProvider(join_path(out_dir, "configs.h"))
- env = LConfigEnvironment(getcwd(), provider)
-
- env.register_builtin_func(builtin.v)
- env.register_builtin_func(builtin.term_type)
- env.register_builtin_func(builtin.parent)
- env.register_builtin_func(builtin.default)
- env.register_builtin_func(builtin.include)
- env.register_builtin_func(builtin.env)
- env.register_builtin_func(builtin.set_value)
-
- env.type_factory().regitser(lcfg_type.PrimitiveType)
- env.type_factory().regitser(lcfg_type.MultipleChoiceType)
-
- return env
-
-def do_config(opt, lcfg_env):
- redo_config = not exists(opt.config_save) or opt.force
- if not redo_config or opt.quiet:
- return
-
- try:
- clean_quit = menuconfig(lcfg_env)
- except TerminalSizeCheckFailed as e:
- least = e.args[0]
- current = e.args[1]
- print(
- f"Your terminal size: {current} is less than minimum requirement of {least}.\n"
- "menuconfig will not function properly, switch to prompt based.\n")
-
- shell = InteractiveShell(lcfg_env)
- clean_quit = shell.render_loop()
+from argparse import ArgumentParser
+from pathlib import Path
+
+from lbuild.build import BuildEnvironment
+from lbuild.scope import ScopeProvider
+from lcfg2.builder import NodeBuilder
+from lcfg2.config import ConfigEnvironment
+
+from shared.export import ExportJsonFile
+from shared.export import ExportHeaderFile
+from shared.export import ExportMakefileRules
+from shared.export import restore_config_value
+from shared.scopes import ConfigScope
+from shared.build_gen import BuildScriptGenerator
+from shared.shconfig import shconfig
+
+class LunaBuild:
+ def __init__(self, options):
+ self.__lbuilder = BuildEnvironment()
+ self.__lconfig = ConfigEnvironment()
+ self.__opt = options
+
+ scope = ConfigScope(self.__lconfig)
+ self.__lbuilder.register_scope(scope)
+
+ scope = ScopeProvider("src")
+ scope.file_subscope("c")
+ scope.file_subscope("h")
+ self.__lbuilder.register_scope(scope)
+
+ scope = ScopeProvider("flag")
+ scope.subscope("cc")
+ scope.subscope("ld")
+ self.__lbuilder.register_scope(scope)
+
+ self.__json = ExportJsonFile(self.__lconfig)
+ self.__make = ExportMakefileRules(self.__lconfig)
+ self.__headr = ExportHeaderFile(self.__lconfig)
+ self.__build = BuildScriptGenerator(self.__lbuilder)
+
+ def load(self):
+ file = self.__opt.lconfig
+ NodeBuilder.build(self.__lconfig, file)
+
+ file = self.__opt.lbuild
+ self.__lbuilder.load(file)
+
+ self.__lconfig.refresh()
+ self.__lbuilder.update()
- if not clean_quit:
- print("Configuration aborted. Nothing has been saved.")
- exit(-1)
-
-def do_buildfile_gen(opts, lcfg_env):
- root_path = abspath(opts.root)
- ws_path = dirname(root_path)
- root_name = basename(root_path)
-
- mkgen = MakefileBuildGen(opts.out_dir)
- env = BuildEnvironment(ws_path, mkgen)
-
- install_lbuild_functions(env)
+ def restore(self):
+ save = self.__opt.save
+ if not Path(save).exists():
+ return False
+
+ restore_config_value(self.__lconfig, save)
+ return True
+
+ def save(self):
+ save = self.__opt.save
+ self.__json.export(save)
+ return True
+
+ def generate(self):
+ outdir = Path(self.__opt.export_dir)
+ if not outdir.exists():
+ outdir.mkdir()
- cfg_provider = LConfigProvider(lcfg_env)
- env.set_config_provider(cfg_provider)
+ if self.__opt.gen_build:
+ self.__build.generate(outdir / "build.mkinc")
- root = LunaBuildFile(env, root_name)
+ if self.__opt.gen_config:
+ self.__make.export(outdir / "config.mkinc")
+ self.__headr.export(outdir / "config.h")
- try:
- root.resolve()
- except Exception as err:
- print("failed to resolve root build file")
- raise err
-
- env.export()
+ def visual_config(self):
+ if not shconfig(self.__lconfig):
+ print("configuration process aborted")
+ exit(1)
def main():
parser = ArgumentParser()
- parser.add_argument("--config", action="store_true", default=False)
- parser.add_argument("--quiet", action="store_true", default=False)
- parser.add_argument("--lconfig-file", default="LConfig")
- parser.add_argument("--config-save", default=".config.json")
- parser.add_argument("--force", action="store_true", default=False)
- parser.add_argument("root", nargs="?", default="LBuild")
- parser.add_argument("-o", "--out-dir", required=True)
+ parser.add_argument("--lconfig", default="LConfig")
+ parser.add_argument("--lbuild", default="LBuild")
+ parser.add_argument("--save", default=".config.json")
+ parser.add_argument("--gen-build", action="store_true", default=False)
+ parser.add_argument("--gen-config", action="store_true", default=False)
+ parser.add_argument("export_dir")
opts = parser.parse_args()
- out_dir = opts.out_dir
- if not exists(out_dir):
- mkdir(out_dir)
+ builder = LunaBuild(opts)
+
+ builder.load()
+ builder.restore()
- lcfg_env = prepare_lconfig_env(out_dir)
- require_config = exists(opts.lconfig_file)
- try:
- if require_config:
- lcfg_env.resolve_module(opts.lconfig_file)
- lcfg_env.update()
- lcfg_env.load()
- except Exception as e:
- print(e)
+ builder.visual_config()
- if opts.config:
- if require_config:
- do_config(opts, lcfg_env)
- else:
- print("No configuration file detected, skipping...")
-
- lcfg_env.update()
- lcfg_env.save(opts.config_save)
- lcfg_env.export()
-
- do_buildfile_gen(opts, lcfg_env)
+ builder.save()
+ builder.generate()
+
if __name__ == "__main__":
main()
\ No newline at end of file
--- /dev/null
+from os.path import isdir, exists
+from lbuild.build import BuildEnvironment
+from lbuild.scope import ScopeAccessor
+
+def gen_source_file(subscope : ScopeAccessor):
+ items = " ".join(subscope.values())
+ return [f"BUILD_SRCS := {items}"]
+
+def gen_header_file(subscope : ScopeAccessor):
+ inc, hdr = [], []
+ for x in subscope.values():
+ if not exists(x):
+ print(f"warning: '{x}' does not exists, skipped")
+ continue
+
+ if isdir(x):
+ inc.append(x)
+ else:
+ hdr.append(x)
+
+ return [
+ f"BUILD_INC := {' '.join(inc)}",
+ f"BUILD_HDR := {' '.join(hdr)}",
+ ]
+
+def gen_ccflags(subscope : ScopeAccessor):
+ items = " ".join(subscope.values())
+ return [f"BUILD_CFLAGS := {items}"]
+
+def gen_ldflags(subscope : ScopeAccessor):
+ items = " ".join(subscope.values())
+ return [f"BUILD_LDFLAGS := {items}"]
+
+
+class BuildScriptGenerator:
+ def __init__(self, env: BuildEnvironment):
+ self.__gen_policy = {
+ "src": {
+ "c": gen_source_file,
+ "h": gen_header_file
+ },
+ "flag": {
+ "cc": gen_ccflags,
+ "ld": gen_ldflags
+ }
+ }
+
+ self.__env = env
+
+ def __gen_lines(self, scope, subscope):
+
+ policy = self.__gen_policy.get(scope.name)
+ if policy is None:
+ print( "warning: no associated policy with"
+ f"'{scope.name}'"
+ ", skipped")
+ return []
+
+ policy = policy.get(subscope.name)
+ if policy is None:
+ print( "warning: no associated policy with "
+ f"'{scope.name}.{subscope.name}' "
+ ", skipped")
+ return []
+
+ return policy(subscope)
+
+ def generate(self, file):
+ lines = []
+
+ for scope in self.__env.scopes.values():
+ for subscope in scope.accessors():
+ lines += self.__gen_lines(scope, subscope)
+
+ with open(file, 'w') as f:
+ f.write('\n'.join(lines))
+
\ No newline at end of file
--- /dev/null
+from lcfg2.config import ConfigEnvironment
+from lcfg2.common import NodeProperty
+from lib.utils import SourceLogger
+import json
+import re
+
+Identifier = re.compile(r"^[A-Za-z_][A-Za-z0-9_]+$")
+
+class Exporter:
+ def __init__(self, env: ConfigEnvironment):
+ self._env = env
+
+ def export(self, file):
+ lines = []
+
+ for node in self._env.terms():
+ for name, val in self.variants(node):
+ line = self._format_line(node, name, val)
+ lines.append(line)
+
+ with open(file, 'w') as f:
+ f.write('\n'.join(lines))
+
+ def _format_line(self, node, name, value):
+ return ""
+
+ def variants(self, node):
+ value = self.get_value(node)
+ basename = f"CONFIG_{str.upper(node._name)}"
+
+ return [(basename, value)]
+
+ def get_value(self, node):
+ return ""
+
+
+class ExportMakefileRules(Exporter):
+ def __init__(self, env: ConfigEnvironment):
+ super().__init__(env)
+
+ def _format_line(self, node, name, value):
+ if not node.enabled():
+ return f"# {name} is disabled"
+ else:
+ return f"{name} := {value}"
+
+ def get_value(self, node):
+ tok = NodeProperty.Token[node]
+ val = NodeProperty.Value[node]
+
+ if type(val) is str:
+ return val
+
+ if type(val) is int:
+ return val
+
+ if type(val) is bool:
+ return ""
+
+ SourceLogger.warn(node, tok,
+ f"value: '{val}' can not mapped to makefile primitives")
+
+ return "## ERROR ##"
+
+class ExportHeaderFile(Exporter):
+ def __init__(self, env: ConfigEnvironment):
+ super().__init__(env)
+
+ def _format_line(self, node, name, value):
+ if not node.enabled():
+ return f"// {name} is disabled"
+ else:
+ return f"#define {name} {value}"
+
+ def get_value(self, node):
+ tok = NodeProperty.Token[node]
+ val = NodeProperty.Value[node]
+
+ if type(val) is str:
+ return f'"{val}"'
+
+ if type(val) is int:
+ return val
+
+ if type(val) is bool:
+ return ""
+
+ SourceLogger.warn(node, tok,
+ f"value: '{val}' can not mapped to C primitives")
+
+ return "// Error"
+
+class ExportJsonFile(Exporter):
+ def __init__(self, env):
+ super().__init__(env)
+
+ def export(self, file):
+ obj = {}
+ for n in self._env.terms():
+ obj[n._name] = NodeProperty.Value[n]
+
+ with open(file, 'w') as f:
+ json.dump(obj, f)
+
+def restore_config_value(env: ConfigEnvironment, json_path):
+ with open(json_path) as f:
+ j = json.load(f)
+ for k, v in j.items():
+ node = env.get_node(k)
+ if node is None:
+ print(f"warning: missing node: '{node}', skipped")
+ continue
+
+ NodeProperty.Value[node] = v
+
+ env.refresh()
\ No newline at end of file
--- /dev/null
+from lbuild.scope import ScopeProvider
+from lcfg2.common import NodeProperty
+
+class ConfigScope(ScopeProvider):
+ def __init__(self, env):
+ super().__init__("config")
+ self.__env = env
+
+ def __getitem__(self, name):
+ node = self.__env.get_node(name)
+ if node is None:
+ raise Exception(f"config '{name}' not exists")
+
+ return NodeProperty.Value[node]
\ No newline at end of file
--- /dev/null
+from lcfg2.config import ConfigEnvironment
+from lcfg2.common import NodeProperty, NodeDependency
+
+import pydoc
+import readline, textwrap
+
+class ShconfigException(Exception):
+ def __init__(self, *args):
+ super().__init__(*args)
+
+ def __str__(self):
+ return str(self.args[0])
+
+def select(val, _if, _else):
+ return _if if val else _else
+
+def get_node(env: ConfigEnvironment, name: str):
+ node_name = name.removeprefix("CONFIG_").lower()
+ node = env.get_node(node_name)
+ if node is None:
+ raise ShconfigException(f"no such config: {name}")
+ return node
+
+def show_configs(env: ConfigEnvironment):
+ aligned = 40
+ lines = [
+ "Display format:",
+ "",
+ " (flags) CONFIG_NAME ..... VALUE",
+ " ",
+ " (flags)",
+ " x Config is disabled",
+ " r Read-Only config",
+ " h Hidden config",
+ "",
+ "",
+ "Defined configuration terms",
+ ""
+ ]
+
+ for node in env.terms():
+ name = f"CONFIG_{node._name.upper()}"
+ value = NodeProperty.Value[node]
+ enabled = NodeProperty.Enabled[node]
+ hidden = NodeProperty.Hidden[node]
+ ro = NodeProperty.Readonly[node]
+
+ status = f"{select(not enabled, 'x', '.')}" \
+ f"{select(ro, 'r', '.')}" \
+ f"{select(hidden, 'h', '.')}" \
+
+ val_txt = f"{value if value is not None else '<?>'}"
+
+ line = f"[{status}] {name}"
+ to_pad = max(aligned - len(line), 4)
+ lines.append(f" {line} {'.' * to_pad} {val_txt}")
+
+ pydoc.pager("\n".join(lines))
+
+def set_config(env: ConfigEnvironment, name: str, value):
+ node_name = name.removeprefix("CONFIG_").lower()
+ node = env.get_node(node_name)
+ if node is None:
+ raise ShconfigException(f"no such config: {name}")
+
+ NodeProperty.Value[node] = value
+ env.refresh()
+
+def __print_dep_recursive(env, node, level = 0):
+ indent = " "*(level * 4)
+ dep: NodeDependency = NodeProperty.Dependency[node]
+
+ print(indent, f"+ {node._name} -> {NodeProperty.Value[node]}")
+ if dep is None:
+ return
+
+ print(indent, f"= {dep._expr}")
+ for name in dep._names:
+ n = env.get_node(name)
+ __print_dep_recursive(env, n, level + 1)
+
+def show_dependency(env: ConfigEnvironment, name: str):
+ node = get_node(env, name)
+ print(node._name)
+ __print_dep_recursive(env, node)
+
+CMDS = {
+ "show": lambda env, *args: show_configs(env),
+ "dep": lambda env, name, *args: show_dependency(env, name)
+}
+
+def next_input(env: ConfigEnvironment):
+ line = input("shconfig> ")
+ parts = line.split(' ')
+
+ if len(parts) == 0:
+ return True
+
+ if parts[0] in ['q', 'exit']:
+ return False
+
+ if parts[0].startswith("CONFIG_"):
+ if len(parts) != 2:
+ raise ShconfigException("expect a value")
+ set_config(env, parts[0], parts[1])
+ return True
+
+ cmd = CMDS.get(parts[0])
+ if cmd is None:
+ raise ShconfigException(f"unknow command: {parts[0]}")
+
+ cmd(env, *parts[1:])
+ return True
+
+def shconfig(env: ConfigEnvironment):
+ print(textwrap.dedent(
+ """
+ Lunaix shconfig
+
+ Usage:
+ show
+ List all configuration
+
+ dep <config>
+ Examine dependency chain for <config>
+
+ <config> <value>
+ Update value of <config> to <value>
+
+ """
+ ).strip())
+ print()
+
+ while True:
+ try:
+ if not next_input(env):
+ return True
+ except ShconfigException as e:
+ print(str(e))
+ continue
+ except KeyboardInterrupt as e:
+ return False
+ except Exception as e:
+ raise e
+ return False
-@Term("Architecture")
-def arch():
+@"Architecture"
+def arch() -> "i386" | "x86_64" | "aarch64" | "rv64":
"""
set the ISA target
"""
- type(["i386", "x86_64", "aarch64", "rv64"])
- default("i386")
- env_val = env("ARCH")
- if env_val is not None:
- set_value(env_val)
\ No newline at end of file
+ _arch = env("ARCH")
+ return _arch if _arch else "x86_64"