format code (#174)

This commit is contained in:
Septs 2021-06-08 08:26:26 +08:00 committed by GitHub
parent 54c02595ea
commit fb8fece6f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 38 deletions

View File

@ -3,18 +3,21 @@ import sys
from pathlib import Path
import toml
from rfc2317 import gen_reverse_pointers
RESOLVE_FILE = Path("dns", "db.10.127")
RFC2317_FILE = Path("dns", "rfc2317.toml")
def iter_rfc2317_entry():
entries = toml.loads(RFC2317_FILE.read_text())
for (route, attributes) in entries.items():
ns = attributes.get('NS')
ds = attributes.get('DS', list())
ttl = attributes.get('TTL', -1)
yield(route, ns, ds, ttl)
ns = attributes.get("NS")
ds = attributes.get("DS", list())
ttl = attributes.get("TTL", -1)
yield (route, ns, ds, ttl)
def main():
orignal = RESOLVE_FILE.read_text()

View File

@ -6,11 +6,11 @@ import argparse
from pathlib import Path
if __name__ == "__main__":
parser = argparse.ArgumentParser('named-formatzone')
parser = argparse.ArgumentParser("named-formatzone")
parser.add_argument("file")
args = parser.parse_args()
zonefile = Path(args.file)
zonelines = zonefile.read_text().split('\n')
zonelines = zonefile.read_text().split("\n")
formatted = list()
max_length = [0, 0, 0, 0, 0]
in_soa = False
@ -18,7 +18,7 @@ if __name__ == "__main__":
def iter_lines(scan_only=True):
soafound = None
for rline in zonelines:
line, *comments = rline.split(';')
line, *comments = rline.split(";")
comments = ";".join(comments)
line = line.strip()
if "SOA" in line and soafound is None:
@ -41,7 +41,7 @@ if __name__ == "__main__":
else:
fmtlline = list()
for i, entry in enumerate(cols):
entry += " "*(max_length[i]-len(entry)+3)
entry += " " * (max_length[i] - len(entry) + 3)
if entry:
fmtlline.append(entry)
fmtline = " ".join(fmtlline)
@ -50,6 +50,7 @@ if __name__ == "__main__":
else:
if not scan_only:
formatted.append(rline)
iter_lines()
iter_lines(False)

View File

@ -2,12 +2,15 @@
import ipaddress
ZONE = '.127.10.in-addr.arpa'
ZONE = ".127.10.in-addr.arpa"
def truncate(rev: str) -> str:
assert rev.endswith(ZONE)
rev = rev[:-len(ZONE)]
rev = rev[: -len(ZONE)]
return rev
def gen_reverse_pointers(network: str, ns: list, ds: list = [], ttl: int = -1) -> list:
ttl = f"{ttl} " if 900 <= ttl <= 86400 else ""
buf = list()
@ -25,5 +28,17 @@ def gen_reverse_pointers(network: str, ns: list, ds: list = [], ttl: int = -1) -
buf.append(f"{cnamefr} {ttl}IN CNAME {cnameto}")
return buf
if __name__ == "__main__":
print("\n".join(gen_reverse_pointers('10.127.8.64/26', ['ns1.jerry.neo.'], ['18792 13 2 2F335456EEE70FC4833886E5EEDC28E7195E90E2A337860B3E805D5EB9F3A804'], ttl=1500)))
print(
"\n".join(
gen_reverse_pointers(
"10.127.8.64/26",
["ns1.jerry.neo."],
[
"18792 13 2 2F335456EEE70FC4833886E5EEDC28E7195E90E2A337860B3E805D5EB9F3A804"
],
ttl=1500,
)
)
)

View File

@ -3,22 +3,22 @@ import argparse
import json
import re
import time
# dnssec
from base64 import b64decode
from collections import defaultdict
from contextlib import redirect_stdout
from functools import wraps
from io import StringIO
from ipaddress import IPv4Network, IPv6Network, ip_network
from itertools import combinations
from pathlib import Path
from functools import wraps
import netaddr
import toml
from tabulate import tabulate
# dnssec
from base64 import b64decode
from dns.dnssec import make_ds
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from tabulate import tabulate
NEO_NETWORK_POOL = [ip_network("10.127.0.0/16"), ip_network("fd10:127::/32")]
@ -66,8 +66,11 @@ def iter_toml_file(path: str):
def _sort_as_iterator(func):
@wraps(func)
def wrapped(*args, **kwargs):
for item in sorted(list(func(*args, **kwargs)), key=lambda x: x[0], reverse=False):
for item in sorted(
list(func(*args, **kwargs)), key=lambda x: x[0], reverse=False
):
yield item
return wrapped
@ -144,7 +147,9 @@ def route_to_roa(asn_table: dict):
try:
assert net1["prefix"] != net2["prefix"]
except AssertionError:
assert net1['asn'] != net2['asn'] and entity_from_net(net1) == entity_from_net(net2)
assert net1["asn"] != net2["asn"] and entity_from_net(
net1
) == entity_from_net(net2)
continue
assert net1["prefix"].supernet_of(net2["prefix"])
s1net, s2net = (net1["supernet"], net2["supernet"])
@ -181,10 +186,19 @@ def prehandle_roa(asn_table: dict, args):
r["prefix"] = r["prefix"].with_prefixlen
return roa4, roa6
def export_dnssec_dnskey():
def ds_from_dnskey(zone, flags, protocol, algorithm, *key):
dnspy_dnskey = DNSKEY("IN", "DNSKEY", int(flags), int(protocol), int(algorithm), b64decode(" ".join(key)))
dnspy_dnskey = DNSKEY(
"IN",
"DNSKEY",
int(flags),
int(protocol),
int(algorithm),
b64decode(" ".join(key)),
)
return make_ds(zone, dnspy_dnskey, "SHA256").to_text()
dnskey_path = Path("dns") / "dnssec"
dnskeys = list()
for f in dnskey_path.iterdir():
@ -199,14 +213,17 @@ def export_dnssec_dnskey():
zonekey["zone"] = zone
else:
assert zonekey["zone"] == zone
zonekey["records"].append({
"dnskey": " ".join(dnskey),
"ds": ds_from_dnskey(zone, *dnskey),
})
zonekey["records"].append(
{
"dnskey": " ".join(dnskey),
"ds": ds_from_dnskey(zone, *dnskey),
}
)
if zonekey["zone"]:
dnskeys.append(zonekey)
return dnskeys
def make_export(roa4, roa6):
def modify_entity(entity):
entity["nic_hdl"] = name_to_nic_hdl(entity["name"])
@ -245,7 +262,7 @@ def make_export(roa4, roa6):
}
for owner, entity in entities.items()
},
"dnssec": export_dnssec_dnskey()
"dnssec": export_dnssec_dnskey(),
}
return json.dumps(output, indent=2)
@ -274,7 +291,10 @@ def make_rfc8416(roa4, roa6):
"bgpsecAssertions": [],
"prefixAssertions": [
pick(
roa, ["asn", "prefix"], maxLength="maxPrefixLength", name="comment",
roa,
["asn", "prefix"],
maxLength="maxPrefixLength",
name="comment",
)
for roa in (*roa4, *roa6)
],
@ -382,19 +402,51 @@ def make_summary():
print(prefix)
print("```")
IP_VRSIONS = {4, 6}
total_ip_count = {ver: sum([prefix.num_addresses for prefix in NEO_NETWORK_POOL if prefix.version == ver]) for ver in IP_VRSIONS}
used_ip_count = {ver: sum([ip_network(str(prefix)).num_addresses for prefix in prefixes if prefix.version == ver]) for ver in IP_VRSIONS}
total_ip_count = {
ver: sum(
[
prefix.num_addresses
for prefix in NEO_NETWORK_POOL
if prefix.version == ver
]
)
for ver in IP_VRSIONS
}
used_ip_count = {
ver: sum(
[
ip_network(str(prefix)).num_addresses
for prefix in prefixes
if prefix.version == ver
]
)
for ver in IP_VRSIONS
}
print()
print("## Address Space Usage")
print()
address_space_usage_table = tabulate(
(
(f"IPv{ver}", f"{(t:=total_ip_count.get(ver)):.5g}", f"{(u:=used_ip_count.get(ver)):.5g}", f"{t-u:.5g}", f"{u/t*100:.2f}%", f"{(t-u)/t*100:.2f}%")
(
f"IPv{ver}",
f"{(t:=total_ip_count.get(ver)):.5g}",
f"{(u:=used_ip_count.get(ver)):.5g}",
f"{t-u:.5g}",
f"{u/t*100:.2f}%",
f"{(t-u)/t*100:.2f}%",
)
for ver in IP_VRSIONS
),
headers=["IP Version", "Total", "Used", "Free", "Percent Used", "Percent Free"],
headers=[
"IP Version",
"Total",
"Used",
"Free",
"Percent Used",
"Percent Free",
],
tablefmt="github",
disable_numparse=True
disable_numparse=True,
)
print(address_space_usage_table)
return stream.getvalue()

View File

@ -1,23 +1,24 @@
#!/usr/bin/env python3
from pathlib import Path
import subprocess
from time import time
from re import match
from os import chdir
from pathlib import Path
from re import match
from time import time
zone_files = [
'neonetwork',
'db.10.127',
'db.fd10.127',
"neonetwork",
"db.10.127",
"db.fd10.127",
]
serial_base = 1586876035
new_serial = int(time()) - serial_base
def update_serial_to(zone: Path, serial: int = 0) -> int:
lines = zone.read_text().split("\n")
processed = list()
assert 0 <= serial <= 2**32
assert 0 <= serial <= 2 ** 32
found = False
old_serial = None
for line in lines:
@ -34,6 +35,7 @@ def update_serial_to(zone: Path, serial: int = 0) -> int:
zone.write_text("\n".join(processed))
return old_serial
for zone in zone_files:
gen_zone = Path("generated") / "dns" / zone
repo_zone = Path("dns") / zone
@ -42,7 +44,9 @@ for zone in zone_files:
old_serial = update_serial_to(gen_zone)
update_serial_to(repo_zone, old_serial)
gen_zone.write_text(repo_zone.read_text())
p = subprocess.run(['git', 'diff', '--exit-code', gen_zone.name], cwd=gen_zone.parent)
p = subprocess.run(
["git", "diff", "--exit-code", gen_zone.name], cwd=gen_zone.parent
)
if p.returncode == 0:
print(f"skip {repo_zone.name}")
else: