mirror of
https://github.com/NeoCloud/NeoNetwork
synced 2025-01-12 18:49:23 +08:00
roa.py: export format
This commit is contained in:
parent
61024759a3
commit
24cd86065b
2 changed files with 28 additions and 6 deletions
|
@ -1,4 +1,4 @@
|
|||
TYPE="LO"
|
||||
NAME="frank-hsinchu"
|
||||
NAME="frank-hsinchu-1"
|
||||
DESC="Ubuntu/bird"
|
||||
ASN="4201270004"
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
from pathlib import Path
|
||||
from ipaddress import IPv4Network, IPv6Network
|
||||
from itertools import combinations
|
||||
|
||||
import re
|
||||
|
||||
class BashParser:
|
||||
def __init__(self):
|
||||
|
@ -53,7 +53,16 @@ def str2asn(s_asn):
|
|||
return int(s_asn)
|
||||
|
||||
|
||||
def name2nichdl(name):
|
||||
r, num = re.subn(r'[^0-9A-Z]', '-', name.upper())
|
||||
_r = len(r.replace('-', ''))
|
||||
assert _r >= 3 # has at least 3 effective chars
|
||||
assert r[0] != '-' # starts with [0-9A-Z]
|
||||
assert num < _r # not too many subs
|
||||
return r
|
||||
|
||||
def neoneo_get_people():
|
||||
nic_hdl_names = set()
|
||||
people = dict()
|
||||
for f in (cwd / "entity").iterdir():
|
||||
try:
|
||||
|
@ -63,6 +72,10 @@ def neoneo_get_people():
|
|||
present_keys = ('name', 'desc', 'contact', 'babel')
|
||||
assert f.name
|
||||
people[f.name] = {k: fc.get(k) for k in present_keys}
|
||||
nic_hdl = name2nichdl(f.name)
|
||||
assert nic_hdl not in nic_hdl_names
|
||||
nic_hdl_names.add('nic_hdl')
|
||||
people[f.name]['nic_hdl'] = nic_hdl
|
||||
for v in people[f.name].values():
|
||||
assert v is not None
|
||||
except Exception:
|
||||
|
@ -105,6 +118,7 @@ def node2asn():
|
|||
NODE_TABLE = node2asn()
|
||||
|
||||
def neonet_route2roa(dirname, is_ipv6=False):
|
||||
net_names = set()
|
||||
roa_entries = list()
|
||||
for f in (cwd / dirname).iterdir():
|
||||
try:
|
||||
|
@ -113,20 +127,26 @@ def neonet_route2roa(dirname, is_ipv6=False):
|
|||
fc = shell2dict(f.read_text())
|
||||
nettype = IPv6Network if is_ipv6 else IPv4Network
|
||||
get_supernet = lambda s_net: None if not s_net else nettype(s_net, strict=True)
|
||||
roa_entries_key = ("asn", "prefix", "supernet")
|
||||
roa_entries_key = ("asn", "prefix", "supernet", "netname")
|
||||
if fc.get('type').lower() in ('lo', 'subnet'):
|
||||
asn = str2asn(fc.get('asn'))
|
||||
assert asn in ASNS
|
||||
route = f.name.replace(',', '/')
|
||||
supernet = get_supernet(fc.get('supernet'))
|
||||
roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet])))
|
||||
netname = name2nichdl(fc.get('name'))
|
||||
assert netname not in net_names
|
||||
net_names.add(netname)
|
||||
roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet, netname])))
|
||||
elif fc.get('type').lower().startswith('tun'):
|
||||
assert NODE_TABLE[fc.get('downstream')] # extra check for downstream
|
||||
asn = NODE_TABLE[fc.get('upstream')]
|
||||
assert asn in ASNS
|
||||
route = f.name.replace(',', '/')
|
||||
supernet = get_supernet(fc.get('supernet'))
|
||||
roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet])))
|
||||
netname = name2nichdl("%s-%s" % (fc.get('type'), route))
|
||||
assert netname not in net_names
|
||||
net_names.add(netname)
|
||||
roa_entries.append(dict(zip(roa_entries_key, [asn, nettype(route, strict=True), supernet, netname])))
|
||||
else:
|
||||
assert fc.get('type').lower() in ('ptp',)
|
||||
except Exception:
|
||||
|
@ -193,13 +213,15 @@ if __name__ == "__main__":
|
|||
as_route4 = list()
|
||||
as_route6 = list()
|
||||
vkeys = [k for k in VALID_KEYS if k != 'asn']
|
||||
vkeys.append('netname')
|
||||
for roa, as_route in ((roa4, as_route4), (roa6, as_route6)):
|
||||
for r in roa:
|
||||
if r['asn'] == asn:
|
||||
as_route.append({k:v for k, v in r.items() if k in vkeys})
|
||||
owner = asi['owner']
|
||||
peopledict = d_output['people'].setdefault(owner, {"info": PEOPLE[owner], "asns": list()})
|
||||
peopledict['asns'].append({"asn": asn, "routes": {'ipv4': as_route4, 'ipv6': as_route6}})
|
||||
peopledict['asns'].append({"asn": asn, **{k:v for k, v in ASNS[asn].items() if k != 'owner'},
|
||||
"routes": {'ipv4': as_route4, 'ipv6': as_route6}})
|
||||
output = json.dumps(d_output, indent=2)
|
||||
elif args.json:
|
||||
import json, time
|
||||
|
|
Loading…
Reference in a new issue