137 lines
4.2 KiB
Python
137 lines
4.2 KiB
Python
#!/usr/bin/python
|
|
|
|
import socket
|
|
import subprocess
|
|
from pathlib import Path
|
|
from sys import argv
|
|
import re
|
|
import shutil
|
|
from time import time
|
|
|
|
WHOIS_SERVER = ('172.20.129.8', 43)
|
|
ASTROOT = Path("/var/lib/asterisk/sounds")
|
|
ERRSOUND = ASTROOT / "en" / "something-terribly-wrong.gsm"
|
|
ROOTDIR = Path("/var/tmp")
|
|
OUTDIR = ROOTDIR / "ast-dynamic"
|
|
OUTDIR.mkdir(exist_ok=True)
|
|
|
|
def cleanup():
|
|
try:
|
|
now = time()
|
|
for f in OUTDIR.iterdir():
|
|
if f.is_file() and now - f.stat().st_mtime > 60.0:
|
|
f.unlink()
|
|
except Exception:
|
|
raise
|
|
|
|
def espeak(to_speak, unid):
|
|
p1 = subprocess.run(
|
|
["espeak-ng", "-v", "en-us", "--stdin", "-p", "80", "--stdout"],
|
|
input=str(to_speak).encode("utf-8"),
|
|
capture_output=True,
|
|
check=True,
|
|
timeout=10
|
|
)
|
|
unfile = f"{unid}.wav"
|
|
p2 = subprocess.run(['ffmpeg', '-hide_banner', '-i', "-",
|
|
'-vn', "-ac", "1", "-ar", "8000", "-y",
|
|
f"{OUTDIR / unfile}"],
|
|
input=p1.stdout,
|
|
capture_output=True,
|
|
check=True,
|
|
timeout=10
|
|
)
|
|
|
|
def whois(arg):
|
|
try:
|
|
i = int(arg)
|
|
except ValueError:
|
|
raise ValueError(f"invalid autonomous system number {','.join(list(arg))}.")
|
|
else:
|
|
if 0 <= i <= 9999:
|
|
arg = f"AS424242{i:04d}"
|
|
elif 20000 <= i <= 29999:
|
|
arg = f"AS42424{i:05d}"
|
|
elif 70000 <= i <= 79999:
|
|
arg = f"AS42012{i:05d}"
|
|
else:
|
|
arg = f"AS{i}"
|
|
with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as s:
|
|
s.settimeout(5)
|
|
s.connect(WHOIS_SERVER)
|
|
s.send((f"{arg}\r\n").encode("utf-8"))
|
|
r = bytes()
|
|
while len(_r := s.recv(1024)) != 0:
|
|
r += _r
|
|
r = r.decode('utf-8')
|
|
lines = r.split('\n')
|
|
target = None
|
|
for (i, line) in enumerate(lines[::-1]):
|
|
if line:
|
|
target = False
|
|
elif not line and target is False:
|
|
target = -i
|
|
break
|
|
lines = lines[target:]
|
|
def multiline_process():
|
|
attr = ""
|
|
for i, line in enumerate(lines):
|
|
if not line:
|
|
continue
|
|
if line.startswith('%'):
|
|
continue
|
|
if m := re.match(r'^([\S]+):.*$', line):
|
|
attr = m.groups()[0]
|
|
assert attr
|
|
else:
|
|
assert attr
|
|
lines[i] = f"{attr}:{lines[i]}"
|
|
try:
|
|
multiline_process()
|
|
except Exception as e:
|
|
raise
|
|
whois_filter = lambda x: any([x.startswith(field) for field in {'remarks', 'nserver', 'descr', 'ds-rdata', 'mp-import', 'mp-export'}])
|
|
lines = [l for l in lines if l and not whois_filter(l)]
|
|
lines_new = list()
|
|
repl = {
|
|
"aut-num": "Autonomous System Number",
|
|
"as-name": "Autonomous System Name",
|
|
"admin-c": "Administrative Contact",
|
|
"tech-c": "Technical Contact",
|
|
"mnt-by": "Maintained by",
|
|
}
|
|
for l in lines:
|
|
if not l.strip():
|
|
continue
|
|
elif l.startswith('%'):
|
|
continue
|
|
elif l.startswith("aut-num:"):
|
|
_p, _s = l.split()
|
|
_s = ",".join(list(_s)).replace("A,S,", "")
|
|
l = f"{_p} {_s}"
|
|
for fr, to in repl.items():
|
|
if l.startswith(f"{fr}:"):
|
|
l = l.replace(f"{fr}:", f"{to}:", 1)
|
|
lines_new.append(l)
|
|
|
|
if not lines_new:
|
|
lines_new.append(f"There is no such Autonomous System {','.join(list(arg)).replace('A,S,', '')}")
|
|
else:
|
|
lines_new.insert(0, "Found the following who is record from the dn42 registry.")
|
|
return ".\n".join(lines_new)+"."
|
|
|
|
def main():
|
|
# argv[1]: as number
|
|
# argv[2]: unique identifier
|
|
cleanup()
|
|
try:
|
|
ret = whois(argv[1])
|
|
except Exception as e:
|
|
ret = f"error, {e}"
|
|
try:
|
|
espeak(ret, argv[2])
|
|
except Exception:
|
|
outpath = OUTDIR / f"{argv[2]}{ERRSOUND.suffix}"
|
|
shutil.copy(ERRSOUND, outpath)
|
|
if __name__ == "__main__":
|
|
main()
|