You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
4.3 KiB

#!/usr/bin/python
import socket
import subprocess
from pathlib import Path
from sys import argv
import re
import shutil
from time import time
WHOIS_SERVER = ('172.20.129.8', 43)
ASTROOT = Path("/var/lib/asterisk/sounds")
ERRSOUND = ASTROOT / "en" / "something-terribly-wrong.gsm"
ROOTDIR = Path("/var/tmp")
OUTDIR = ROOTDIR / "ast-dynamic"
OUTDIR.mkdir(exist_ok=True)
def cleanup(*files):
try:
for f in files:
if f.exists():
f.unlink()
now = time()
for f in OUTDIR.iterdir():
if f.is_file() and now - f.stat().st_mtime > 60.0:
f.unlink()
except Exception:
raise
def espeak(to_speak, outfile):
p1 = subprocess.run(
["espeak-ng", "-v", "en-us", "--stdin", "-p", "80", "--stdout"],
input=str(to_speak).encode("utf-8"),
capture_output=True,
check=True,
timeout=10
)
p2 = subprocess.run(['ffmpeg', '-hide_banner', '-i', "-",
"-vn", "-ac", "1", "-ar", "8000", f"{outfile}"],
input=p1.stdout,
capture_output=True,
check=True,
timeout=10
)
def whois(arg):
try:
i = int(arg)
except ValueError:
raise ValueError(f"invalid autonomous system number {','.join(list(arg))}.")
else:
if 0 <= i <= 9999:
arg = f"AS424242{i:04d}"
elif 20000 <= i <= 29999:
arg = f"AS42424{i:05d}"
elif 70000 <= i <= 79999:
arg = f"AS42012{i:05d}"
else:
arg = f"AS{i}"
with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as s:
s.settimeout(5)
s.connect(WHOIS_SERVER)
s.send((f"{arg}\r\n").encode("utf-8"))
r = bytes()
while len(_r := s.recv(1024)) != 0:
r += _r
r = r.decode('utf-8')
lines = r.split('\n')
target = None
for (i, line) in enumerate(lines[::-1]):
if line:
target = False
elif not line and target is False:
target = -i
break
lines = lines[target:]
def multiline_process():
attr = ""
for i, line in enumerate(lines):
if not line:
continue
if line.startswith('%'):
continue
if m := re.match(r'^([\S]+):.*$', line):
attr = m.groups()[0]
assert attr
else:
assert attr
lines[i] = f"{attr}:{lines[i]}"
try:
multiline_process()
except Exception as e:
raise
whois_filter = lambda x: any([x.startswith(field) for field in {'remarks', 'nserver', 'descr', 'ds-rdata', 'mp-import', 'mp-export'}])
lines = [l for l in lines if l and not whois_filter(l)]
lines_new = list()
repl = {
"aut-num": "Autonomous System Number",
"as-name": "Autonomous System Name",
"admin-c": "Administrative Contact",
"tech-c": "Technical Contact",
"mnt-by": "Maintained by",
}
for l in lines:
if not l.strip():
continue
elif l.startswith('%'):
continue
elif l.startswith("aut-num:"):
_p, _s = l.split()
_s = ",".join(list(_s)).replace("A,S,", "")
l = f"{_p} {_s}"
for fr, to in repl.items():
if l.startswith(f"{fr}:"):
l = l.replace(f"{fr}:", f"{to}:", 1)
lines_new.append(l)
if not lines_new:
lines_new.append(f"There is no such Autonomous System {','.join(list(arg)).replace('A,S,', '')}")
else:
lines_new.insert(0, "Found the following who is record from the dn42 registry.")
return ".\n".join(lines_new)+"."
def main():
# argv[1]: as number
# argv[2]: unique identifier
unid = argv[2]
outfile = OUTDIR / f"{unid}.wav"
errfile = OUTDIR / f"{unid}{ERRSOUND.suffix}"
cleanup(outfile, errfile)
try:
ret = whois(argv[1])
except Exception as e:
ret = f"error, {e}"
try:
espeak(ret, outfile)
except Exception:
shutil.copy(ERRSOUND, errfile)
if __name__ == "__main__":
main()