#!/usr/bin/python import socket import subprocess from pathlib import Path from sys import argv import re import shutil from time import time WHOIS_SERVER = ('172.20.129.8', 43) ASTROOT = Path("/var/lib/asterisk/sounds") ERRSOUND = ASTROOT / "en" / "something-terribly-wrong.gsm" ROOTDIR = Path("/var/tmp") OUTDIR = ROOTDIR / "ast-dynamic" OUTDIR.mkdir(exist_ok=True) def cleanup(*files): try: for f in files: if f.exists(): f.unlink() now = time() for f in OUTDIR.iterdir(): if f.is_file() and now - f.stat().st_mtime > 60.0: f.unlink() except Exception: raise def espeak(to_speak, outfile): p1 = subprocess.run( ["espeak-ng", "-v", "en-us", "--stdin", "-p", "80", "--stdout"], input=str(to_speak).encode("utf-8"), capture_output=True, check=True, timeout=10 ) p2 = subprocess.run(['ffmpeg', '-hide_banner', '-i', "-", "-vn", "-ac", "1", "-ar", "8000", f"{outfile}"], input=p1.stdout, capture_output=True, check=True, timeout=10 ) def whois(arg): try: i = int(arg) except ValueError: raise ValueError(f"invalid autonomous system number {','.join(list(arg))}.") else: if 0 <= i <= 9999: arg = f"AS424242{i:04d}" elif 20000 <= i <= 29999: arg = f"AS42424{i:05d}" elif 70000 <= i <= 79999: arg = f"AS42012{i:05d}" else: arg = f"AS{i}" with socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) as s: s.settimeout(5) s.connect(WHOIS_SERVER) s.send((f"{arg}\r\n").encode("utf-8")) r = bytes() while len(_r := s.recv(1024)) != 0: r += _r r = r.decode('utf-8') lines = r.split('\n') target = None for (i, line) in enumerate(lines[::-1]): if line: target = False elif not line and target is False: target = -i break lines = lines[target:] def multiline_process(): attr = "" for i, line in enumerate(lines): if not line: continue if line.startswith('%'): continue if m := re.match(r'^([\S]+):.*$', line): attr = m.groups()[0] assert attr else: assert attr lines[i] = f"{attr}:{lines[i]}" try: multiline_process() except Exception as e: raise whois_filter = lambda x: any([x.startswith(field) for field in {'remarks', 'nserver', 'descr', 'ds-rdata', 'mp-import', 'mp-export'}]) lines = [l for l in lines if l and not whois_filter(l)] lines_new = list() repl = { "aut-num": "Autonomous System Number", "as-name": "Autonomous System Name", "admin-c": "Administrative Contact", "tech-c": "Technical Contact", "mnt-by": "Maintained by", } for l in lines: if not l.strip(): continue elif l.startswith('%'): continue elif l.startswith("aut-num:"): _p, _s = l.split() _s = ",".join(list(_s)).replace("A,S,", "") l = f"{_p} {_s}" for fr, to in repl.items(): if l.startswith(f"{fr}:"): l = l.replace(f"{fr}:", f"{to}:", 1) lines_new.append(l) if not lines_new: lines_new.append(f"There is no such Autonomous System {','.join(list(arg)).replace('A,S,', '')}") else: lines_new.insert(0, "Found the following who is record from the dn42 registry.") return ".\n".join(lines_new)+"." def main(): # argv[1]: as number # argv[2]: unique identifier unid = argv[2] outfile = OUTDIR / f"{unid}.wav" errfile = OUTDIR / f"{unid}{ERRSOUND.suffix}" cleanup(outfile, errfile) try: ret = whois(argv[1]) except Exception as e: ret = f"error, {e}" try: espeak(ret, outfile) except Exception: shutil.copy(ERRSOUND, errfile) if __name__ == "__main__": main()