mirror of
https://github.com/NeoCloud/NeoNetwork
synced 2024-12-26 10:59:24 +08:00
add roa tool
This commit is contained in:
parent
cff81940b6
commit
c98a51ba61
1 changed files with 127 additions and 0 deletions
127
scripts/roa.py
Executable file
127
scripts/roa.py
Executable file
|
@ -0,0 +1,127 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from ipaddress import IPv4Network, IPv6Network
|
||||||
|
from itertools import combinations
|
||||||
|
|
||||||
|
def keyVal(line):
|
||||||
|
l = line.split('=')
|
||||||
|
assert len(l) > 1
|
||||||
|
repl_quotes = lambda t: t.replace('"', '').replace('\'', '')
|
||||||
|
return [l[0].strip(), '='.join([repl_quotes(i).strip() for i in l[1:]])]
|
||||||
|
|
||||||
|
cwd = Path()
|
||||||
|
assert not [d for d in ("asn", "route", "route6") if not (cwd / d).is_dir()]
|
||||||
|
|
||||||
|
def get_asns():
|
||||||
|
asns = list()
|
||||||
|
for f in (cwd / "asn").iterdir():
|
||||||
|
try:
|
||||||
|
if not f.is_file():
|
||||||
|
continue
|
||||||
|
assert f.name.lower().startswith('as')
|
||||||
|
asns.append(int(f.name[2:]))
|
||||||
|
except Exception:
|
||||||
|
print("[!] Error while processing file", f)
|
||||||
|
raise
|
||||||
|
return asns
|
||||||
|
|
||||||
|
ASNS = get_asns()
|
||||||
|
|
||||||
|
def route2roa(dirname, is_ipv6=False):
|
||||||
|
roa_entries = list()
|
||||||
|
for f in (cwd / dirname).iterdir():
|
||||||
|
try:
|
||||||
|
if not f.is_file():
|
||||||
|
continue
|
||||||
|
t = f.read_text()
|
||||||
|
lines = t.split('\n')
|
||||||
|
fc = dict()
|
||||||
|
for line in lines:
|
||||||
|
l = line.strip()
|
||||||
|
if not l or l.startswith('#'):
|
||||||
|
continue
|
||||||
|
key, val = keyVal(l)
|
||||||
|
fc[key.lower()] = val.lower()
|
||||||
|
nettype = IPv6Network if is_ipv6 else IPv4Network
|
||||||
|
if fc.get('type') in ('lo', 'subnet'):
|
||||||
|
asn = int(fc.get('as'))
|
||||||
|
assert asn in ASNS
|
||||||
|
route = f.name.replace(',', '/')
|
||||||
|
roa_entries.append([asn, nettype(route, strict=True)])
|
||||||
|
elif fc.get('type').startswith('tun'):
|
||||||
|
asn = int(fc.get('upstream').split(':')[1])
|
||||||
|
assert asn in ASNS
|
||||||
|
route = f.name.replace(',', '/')
|
||||||
|
roa_entries.append([asn, nettype(route, strict=True)])
|
||||||
|
else:
|
||||||
|
assert fc.get('type') in ('ptp',)
|
||||||
|
except Exception:
|
||||||
|
print("[!] Error while processing file", f)
|
||||||
|
raise
|
||||||
|
roa_entries.sort(key=lambda l: l[0])
|
||||||
|
for en1, en2 in combinations(roa_entries, 2):
|
||||||
|
if en1[1].overlaps(en2[1]):
|
||||||
|
print("[!] Error: found", en1[1], "overlaps", en2[1])
|
||||||
|
raise AssertionError
|
||||||
|
return roa_entries
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser(description='NeoNetwork ROA tool')
|
||||||
|
parser.add_argument('-m', '--max', type=int, default=30, help='set ipv4 max prefix length')
|
||||||
|
parser.add_argument('-M', '--max6', type=int, default=64, help='set ipv6 max prefix length')
|
||||||
|
parser.add_argument('-j', '--json', action='store_true', help='output json')
|
||||||
|
parser.add_argument('-o', '--output', default='', help='write output to file')
|
||||||
|
parser.add_argument('-4', '--ipv4', action='store_true', help='print ipv4 only')
|
||||||
|
parser.add_argument('-6', '--ipv6', action='store_true', help='print ipv6 only')
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.max < 0 or args.max6 < 0 or args.max > IPv4Network(0).max_prefixlen or args.max6 > IPv6Network(0).max_prefixlen:
|
||||||
|
parser.error('check your max prefix length')
|
||||||
|
|
||||||
|
roa4 = roa6 = list()
|
||||||
|
if args.ipv4:
|
||||||
|
roa4 = route2roa('route')
|
||||||
|
elif args.ipv6:
|
||||||
|
roa6 = route2roa('route6', True)
|
||||||
|
else:
|
||||||
|
roa4 = route2roa('route')
|
||||||
|
roa6 = route2roa('route6', True)
|
||||||
|
|
||||||
|
roa4 = [r for r in roa4 if r[1].prefixlen <= args.max or r[1].prefixlen == IPv4Network(0).max_prefixlen]
|
||||||
|
roa6 = [r for r in roa6 if r[1].prefixlen <= args.max6]
|
||||||
|
|
||||||
|
for r in roa4:
|
||||||
|
if r[1].prefixlen == IPv4Network(0).max_prefixlen:
|
||||||
|
r.append(IPv4Network(0).max_prefixlen)
|
||||||
|
else:
|
||||||
|
r.append(args.max)
|
||||||
|
r[1] = r[1].with_prefixlen
|
||||||
|
for r in roa6:
|
||||||
|
r.append(args.max6)
|
||||||
|
r[1] = r[1].with_prefixlen
|
||||||
|
|
||||||
|
output = ""
|
||||||
|
if args.json:
|
||||||
|
import json
|
||||||
|
d_output = dict()
|
||||||
|
for r in roa4:
|
||||||
|
d_output.setdefault('ipv4', list()).append(dict(zip(['asn', 'prefix', 'max'], r)))
|
||||||
|
for r in roa6:
|
||||||
|
d_output.setdefault('ipv6', list()).append(dict(zip(['asn', 'prefix', 'max'], r)))
|
||||||
|
output = json.dumps(d_output, indent=2)
|
||||||
|
else:
|
||||||
|
output += "# NeoNetwork ROA tool\n"
|
||||||
|
pattern = 'route %s max %d as %d;'
|
||||||
|
l_output = list()
|
||||||
|
for (asn, prefix, maxlen) in roa4:
|
||||||
|
l_output.append(pattern % (prefix, maxlen, asn))
|
||||||
|
for (asn, prefix, maxlen) in roa6:
|
||||||
|
l_output.append(pattern % (prefix, maxlen, asn))
|
||||||
|
output += '\n'.join(l_output)
|
||||||
|
if not args.output or args.output == '-':
|
||||||
|
print(output)
|
||||||
|
else:
|
||||||
|
Path(args.output).write_text(output)
|
||||||
|
print('written to', args.output)
|
Loading…
Reference in a new issue