replace old readline mechanism with a new one since ld_proload is clearly not the way forward

This commit is contained in:
JerryXiao 2022-12-03 18:41:48 +08:00
parent 7d2d3caee2
commit 6169a637bf
Signed by: Jerry
GPG key ID: 22618F758B5BE2E5
2 changed files with 81 additions and 37 deletions

View file

@ -161,7 +161,7 @@ def do_system_upgrade(debug=False, interactive=False) -> checkReport:
stdout_handler = logging.handlers.RotatingFileHandler(LOG_DIR / "stdout.log", mode='a', stdout_handler = logging.handlers.RotatingFileHandler(LOG_DIR / "stdout.log", mode='a',
maxBytes=10*1024**2, backupCount=2) maxBytes=10*1024**2, backupCount=2)
stdout_handler.setFormatter(_formatter) stdout_handler.setFormatter(_formatter)
stdout_handler.setLevel(logging.DEBUG) stdout_handler.setLevel(logging.DEBUG+1)
except Exception: except Exception:
logging.exception(f"unable to save stdout to {LOG_DIR}") logging.exception(f"unable to save stdout to {LOG_DIR}")
stdout_handler = None stdout_handler = None
@ -296,6 +296,7 @@ def main() -> None:
args = parser.parse_args() args = parser.parse_args()
_log_format = '%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s' if args.debug else '%(levelname)s - %(message)s' _log_format = '%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s' if args.debug else '%(levelname)s - %(message)s'
logging.basicConfig(level=logging.DEBUG, format=_log_format) logging.basicConfig(level=logging.DEBUG, format=_log_format)
logging.addLevelName(logging.DEBUG+1, 'DEBUG+1')
if not args.debug: if not args.debug:
assert len(logger.handlers) == 1 assert len(logger.handlers) == 1
logger.handlers[0].setLevel(logging.INFO) logger.handlers[0].setLevel(logging.INFO)

View file

@ -1,16 +1,23 @@
import subprocess import subprocess
from threading import Thread from threading import Thread
import logging import logging
from typing import List, BinaryIO, Iterator, Union from typing import List, BinaryIO, Iterator, Union, Callable
from io import DEFAULT_BUFFER_SIZE from io import DEFAULT_BUFFER_SIZE
from time import mktime from time import mktime
from datetime import datetime from datetime import datetime
from signal import SIGINT, SIGTERM, Signals from signal import SIGINT, SIGTERM, Signals
from select import select from select import select
from sys import stdin from sys import stdin
from os import environ from os import set_blocking, close as os_close
from pty import openpty
from re import compile
logger = logging.getLogger() logger = logging.getLogger()
ANSI_ESCAPE = compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
# 0a, 0d and 1b need special process
GENERAL_NON_PRINTABLE = {b'\x07', b'\x08', b'\x09', b'\x0b', b'\x0c', b'\x7f'}
class UnknownQuestionError(subprocess.SubprocessError): class UnknownQuestionError(subprocess.SubprocessError):
def __init__(self, question, output=None): def __init__(self, question, output=None):
self.question = question self.question = question
@ -30,7 +37,7 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool =
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
logger.critical(f'unable to terminate {p}, killing') logger.critical(f'unable to terminate {p}, killing')
p.kill() p.kill()
def set_timeout(p: subprocess.Popen, timeout: int) -> None: def set_timeout(p: subprocess.Popen, timeout: int, callback: Callable = lambda: None) -> None:
try: try:
p.wait(timeout=timeout) p.wait(timeout=timeout)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -38,45 +45,81 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool =
terminate(p) terminate(p)
else: else:
logger.debug('set_timeout exit') logger.debug('set_timeout exit')
linebuf_env = dict(environ) finally:
linebuf_env['_STDBUF_O'] = 'L' callback()
linebuf_env['_STDBUF_E'] = 'L' ptymaster, ptyslave = openpty()
linebuf_env['LD_PRELOAD'] = '/usr/lib/coreutils/libstdbuf.so' set_blocking(ptymaster, False)
stdout = open(ptymaster, "rb", buffering=0)
stdin = open(ptymaster, "w")
p = subprocess.Popen( p = subprocess.Popen(
command, command,
stdin=subprocess.PIPE, stdin=ptyslave,
stdout=subprocess.PIPE, stdout=ptyslave,
stderr=subprocess.STDOUT, stderr=ptyslave,
encoding='utf-8', )
env=linebuf_env logger.log(logging.DEBUG+1, f"running {command}")
)
logger.debug(f"running {command}")
try: try:
Thread(target=set_timeout, args=(p, timeout), daemon=True).start() def cleanup():
line = '' actions = (
(stdin, "close"),
(stdout, "close"),
(ptymaster, os_close),
(ptyslave, os_close),
)
for obj, action in actions:
try:
if isinstance(action, str):
getattr(obj, action)()
else:
action(obj)
except OSError:
pass
Thread(target=set_timeout, args=(p, timeout, cleanup), daemon=True).start()
output = '' output = ''
while (r := p.stdout.read(1)) != '': while p.poll() is None:
output += r try:
line += r select([ptymaster], list(), list())
if r == '\n': _raw = stdout.read()
logger.debug('STDOUT: %s', line[:-1]) except (OSError, ValueError):
line = '' # should be cleanup routine closed the fd, lets check the process return code
if line == ':: Proceed with installation? [Y/n]': continue
p.stdin.write('y\n') if not _raw:
p.stdin.flush() logger.debug('read void from stdout')
elif line.lower().endswith('[y/n]') or line == 'Enter a number (default=1): ': continue
if interactive: logger.debug(f"raw stdout: {_raw}")
choice = ask_interactive_question(line, info=output) for b in GENERAL_NON_PRINTABLE:
if choice is None: _raw = _raw.replace(b, b'')
need_attention = b'\x1b[?25h' in _raw
raw = _raw.decode('utf-8', errors='replace')
raw = raw.replace('\r\n', '\n')
raw = ANSI_ESCAPE.sub('', raw)
output += raw
rawl = (raw[:-1] if raw.endswith('\n') else raw).split('\n')
for l in rawl:
logger.log(logging.DEBUG+1, 'STDOUT: %s', l)
rstrip1 = lambda x: x[:-1] if x.endswith(' ') else x
for l in rawl:
line = rstrip1(l)
if line == ':: Proceed with installation? [Y/n]':
need_attention = False
stdin.write('y\n')
stdin.flush()
elif line.lower().endswith('[y/n]') or line == 'Enter a number (default=1):':
need_attention = False
if interactive:
choice = ask_interactive_question(line, info=output)
if choice is None:
terminate(p, signal=SIGINT)
raise UnknownQuestionError(line, output)
elif choice:
stdin.write(f"{choice}\n")
stdin.flush()
else:
terminate(p, signal=SIGINT) terminate(p, signal=SIGINT)
raise UnknownQuestionError(line, output) raise UnknownQuestionError(line, output)
elif choice: if need_attention and raw:
p.stdin.write(f"{choice}\n") raise UnknownQuestionError("<caused by show cursor sequence>", output)
p.stdin.flush() except (KeyboardInterrupt, UnknownQuestionError):
else:
terminate(p, signal=SIGINT)
raise UnknownQuestionError(line, output)
except KeyboardInterrupt:
terminate(p, signal=SIGINT) terminate(p, signal=SIGINT)
raise raise
except Exception: except Exception: