Parser: init
This commit is contained in:
parent
d42e924d68
commit
866daa51ba
1 changed files with 144 additions and 0 deletions
144
src/main.py
Normal file
144
src/main.py
Normal file
|
@ -0,0 +1,144 @@
|
|||
import yaml
|
||||
import argparse
|
||||
import re
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
parser = argparse.ArgumentParser(description="Process some integers.")
|
||||
parser.add_argument("file", type=Path, help="yaml config")
|
||||
parser.add_argument("--json", action="store_true", help="output as json")
|
||||
parser.add_argument("--out", type=Path, help="write into file")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
# ╔───────────────────────────────────────────────────────────────────────────╗
|
||||
# │ Λssεrτiδηs |
|
||||
# ╚───────────────────────────────────────────────────────────────────────────╝
|
||||
def check_for_duplicates(host, props):
|
||||
seen = set()
|
||||
|
||||
for v in props:
|
||||
if v in seen:
|
||||
raise SyntaxError(f"Duplicate key on `{host}`: `{v}`")
|
||||
else:
|
||||
seen.add(v)
|
||||
|
||||
|
||||
def check_duplicate_hosts(data):
|
||||
hosts = set()
|
||||
|
||||
for host in data:
|
||||
if host in hosts:
|
||||
raise SyntaxError(f"Duplicate host: `{host}`")
|
||||
hosts.add(host)
|
||||
|
||||
|
||||
def run_assertions(host, props):
|
||||
check_for_duplicates(host, props)
|
||||
|
||||
|
||||
# ╔───────────────────────────────────────────────────────────────────────────╗
|
||||
# │ Pαrsεr |
|
||||
# ╚───────────────────────────────────────────────────────────────────────────╝
|
||||
def expand_for_loop(props):
|
||||
expand = list()
|
||||
|
||||
for loop in props.get("for", []):
|
||||
r = loop["range"]
|
||||
|
||||
if isinstance(r, dict):
|
||||
start = int(r["from"])
|
||||
cease = int(r["to"])
|
||||
inc = int(r.get("increment", 1))
|
||||
|
||||
iterations = range(start, cease, inc)
|
||||
elif isinstance(r, list):
|
||||
iterations = r
|
||||
else:
|
||||
raise TypeError("Expected list or dict in `for` loop `range`")
|
||||
|
||||
pattern = re.compile(rf'\$\{{{loop["variable"]}\}}')
|
||||
|
||||
for i in iterations:
|
||||
expand.append(pattern.sub(str(i), loop["template"]))
|
||||
|
||||
return expand
|
||||
|
||||
|
||||
def get_ssh_props(data):
|
||||
props = list()
|
||||
|
||||
for k, v in data["ssh_props"].items():
|
||||
if isinstance(v, list):
|
||||
for x in v:
|
||||
props.append(f"{k} {x}")
|
||||
else:
|
||||
props.append(f"{k} {v}")
|
||||
|
||||
return props
|
||||
|
||||
|
||||
def stringify_booleans(props):
|
||||
pattern = re.compile(r"(True|False)$")
|
||||
|
||||
def replace_bools(s):
|
||||
m = pattern.search(s)
|
||||
|
||||
if m and m[1] == "True":
|
||||
return pattern.sub("true", s)
|
||||
elif m and m[1] == "False":
|
||||
return pattern.sub("false", s)
|
||||
else:
|
||||
return s
|
||||
|
||||
return [replace_bools(prop) for prop in props]
|
||||
|
||||
|
||||
def parse_yaml(data):
|
||||
expanded = dict()
|
||||
check_duplicate_hosts(data)
|
||||
|
||||
for host, props in data.items():
|
||||
parsed = list()
|
||||
parsed.extend(get_ssh_props(props))
|
||||
parsed.extend(expand_for_loop(props))
|
||||
|
||||
parsed = stringify_booleans(parsed)
|
||||
run_assertions(host, parsed)
|
||||
|
||||
expanded[host] = sorted(parsed)
|
||||
|
||||
return expanded
|
||||
|
||||
|
||||
# ╔───────────────────────────────────────────────────────────────────────────╗
|
||||
# │ Fδrmαττεr |
|
||||
# ╚───────────────────────────────────────────────────────────────────────────╝
|
||||
def to_ssh_config_string(parse):
|
||||
s = str()
|
||||
|
||||
for host, keys in parse.items():
|
||||
s += f"Host {host}\n"
|
||||
for key in keys:
|
||||
s += f"\t{key}\n"
|
||||
s += "\n"
|
||||
|
||||
return s
|
||||
|
||||
|
||||
# ╔───────────────────────────────────────────────────────────────────────────╗
|
||||
# │ Mαiη |
|
||||
# ╚───────────────────────────────────────────────────────────────────────────╝
|
||||
with open(args.file, "r") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
parse = parse_yaml(data)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(parse))
|
||||
elif args.out:
|
||||
with open(args.out, 'w') as f:
|
||||
f.write(to_ssh_config_string(parse))
|
||||
else:
|
||||
print(to_ssh_config_string(parse), end="")
|
Loading…
Reference in a new issue