You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
140 lines
4.3 KiB
140 lines
4.3 KiB
#!/usr/bin/python |
|
|
|
import socket |
|
import subprocess |
|
from pathlib import Path |
|
from sys import argv |
|
import re |
|
import shutil |
|
from time import time |
|
|
|
WHOIS_SERVER = ('172.20.129.8', 43) |
|
ASTROOT = Path("/var/lib/asterisk/sounds") |
|
ERRSOUND = ASTROOT / "en" / "something-terribly-wrong.gsm" |
|
ROOTDIR = Path("/var/tmp") |
|
OUTDIR = ROOTDIR / "ast-dynamic" |
|
OUTDIR.mkdir(exist_ok=True) |
|
|
|
def cleanup(*files): |
|
try: |
|
for f in files: |
|
if f.exists(): |
|
f.unlink() |
|
now = time() |
|
for f in OUTDIR.iterdir(): |
|
if f.is_file() and now - f.stat().st_mtime > 60.0: |
|
f.unlink() |
|
except Exception: |
|
raise |
|
|
|
def espeak(to_speak, outfile): |
|
p1 = subprocess.run( |
|
["espeak-ng", "-v", "en-us", "--stdin", "-p", "80", "--stdout"], |
|
input=str(to_speak).encode("utf-8"), |
|
capture_output=True, |
|
check=True, |
|
timeout=10 |
|
) |
|
p2 = subprocess.run(['ffmpeg', '-hide_banner', '-i', "-", |
|
"-vn", "-ac", "1", "-ar", "8000", f"{outfile}"], |
|
input=p1.stdout, |
|
capture_output=True, |
|
check=True, |
|
timeout=10 |
|
) |
|
|
|
def whois(arg): |
|
try: |
|
i = int(arg) |
|
except ValueError: |
|
raise ValueError(f"invalid autonomous system number {','.join(list(arg))}.") |
|
else: |
|
if 0 <= i <= 9999: |
|
arg = f"AS424242{i:04d}" |
|
elif 20000 <= i <= 29999: |
|
arg = f"AS42424{i:05d}" |
|
elif 70000 <= i <= 79999: |
|
arg = f"AS42012{i:05d}" |
|
else: |
|
arg = f"AS{i}" |
|
with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as s: |
|
s.settimeout(5) |
|
s.connect(WHOIS_SERVER) |
|
s.send((f"{arg}\r\n").encode("utf-8")) |
|
r = bytes() |
|
while len(_r := s.recv(1024)) != 0: |
|
r += _r |
|
r = r.decode('utf-8') |
|
lines = r.split('\n') |
|
target = None |
|
for (i, line) in enumerate(lines[::-1]): |
|
if line: |
|
target = False |
|
elif not line and target is False: |
|
target = -i |
|
break |
|
lines = lines[target:] |
|
def multiline_process(): |
|
attr = "" |
|
for i, line in enumerate(lines): |
|
if not line: |
|
continue |
|
if line.startswith('%'): |
|
continue |
|
if m := re.match(r'^([\S]+):.*$', line): |
|
attr = m.groups()[0] |
|
assert attr |
|
else: |
|
assert attr |
|
lines[i] = f"{attr}:{lines[i]}" |
|
try: |
|
multiline_process() |
|
except Exception as e: |
|
raise |
|
whois_filter = lambda x: any([x.startswith(field) for field in {'remarks', 'nserver', 'descr', 'ds-rdata', 'mp-import', 'mp-export'}]) |
|
lines = [l for l in lines if l and not whois_filter(l)] |
|
lines_new = list() |
|
repl = { |
|
"aut-num": "Autonomous System Number", |
|
"as-name": "Autonomous System Name", |
|
"admin-c": "Administrative Contact", |
|
"tech-c": "Technical Contact", |
|
"mnt-by": "Maintained by", |
|
} |
|
for l in lines: |
|
if not l.strip(): |
|
continue |
|
elif l.startswith('%'): |
|
continue |
|
elif l.startswith("aut-num:"): |
|
_p, _s = l.split() |
|
_s = ",".join(list(_s)).replace("A,S,", "") |
|
l = f"{_p} {_s}" |
|
for fr, to in repl.items(): |
|
if l.startswith(f"{fr}:"): |
|
l = l.replace(f"{fr}:", f"{to}:", 1) |
|
lines_new.append(l) |
|
|
|
if not lines_new: |
|
lines_new.append(f"There is no such Autonomous System {','.join(list(arg)).replace('A,S,', '')}") |
|
else: |
|
lines_new.insert(0, "Found the following who is record from the dn42 registry.") |
|
return ".\n".join(lines_new)+"." |
|
|
|
def main(): |
|
# argv[1]: as number |
|
# argv[2]: unique identifier |
|
unid = argv[2] |
|
outfile = OUTDIR / f"{unid}.wav" |
|
errfile = OUTDIR / f"{unid}{ERRSOUND.suffix}" |
|
cleanup(outfile, errfile) |
|
try: |
|
ret = whois(argv[1]) |
|
except Exception as e: |
|
ret = f"error, {e}" |
|
try: |
|
espeak(ret, outfile) |
|
except Exception: |
|
shutil.copy(ERRSOUND, errfile) |
|
if __name__ == "__main__": |
|
main()
|
|
|