diff --git a/src/pacroller/main.py b/src/pacroller/main.py index 1e13901..a7a414e 100644 --- a/src/pacroller/main.py +++ b/src/pacroller/main.py @@ -131,7 +131,8 @@ def upgrade(interactive=False) -> List[str]: raise PackageHold(errors) except PackageHold as e: if interactive: - if ask_interactive_question(f"{e}, continue?"): + user_input = ask_interactive_question(f"{e}, continue? ") + if user_input and user_input.lower().startswith('y'): logger.warning("user determined to continue") else: raise diff --git a/src/pacroller/utils.py b/src/pacroller/utils.py index 77f2bb9..0ab2e0d 100644 --- a/src/pacroller/utils.py +++ b/src/pacroller/utils.py @@ -8,6 +8,7 @@ from datetime import datetime from signal import SIGINT, SIGTERM, Signals from select import select from sys import stdin +from os import environ logger = logging.getLogger() class UnknownQuestionError(subprocess.SubprocessError): @@ -37,12 +38,17 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool = terminate(p) else: logger.debug('set_timeout exit') + linebuf_env = dict(environ) + linebuf_env['_STDBUF_O'] = 'L' + linebuf_env['_STDBUF_E'] = 'L' + linebuf_env['LD_PRELOAD'] = '/usr/lib/coreutils/libstdbuf.so' p = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - encoding='utf-8' + encoding='utf-8', + env=linebuf_env ) logger.debug(f"running {command}") try: @@ -55,25 +61,21 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool = if r == '\n': logger.debug('STDOUT: %s', line[:-1]) line = '' - elif r == ']': - if line == ':: Proceed with installation? [Y/n]': - p.stdin.write('y\n') - p.stdin.flush() - elif line.lower().endswith('[y/n]'): - if interactive: - choice = ask_interactive_question(line, info=output) - if choice is None: - terminate(p, signal=SIGINT) - raise UnknownQuestionError(line, output) - elif choice: - p.stdin.write('y\n') - p.stdin.flush() - else: - p.stdin.write('n\n') - p.stdin.flush() - else: + if line == ':: Proceed with installation? [Y/n]': + p.stdin.write('y\n') + p.stdin.flush() + elif line.lower().endswith('[y/n]') or line == 'Enter a number (default=1): ': + if interactive: + choice = ask_interactive_question(line, info=output) + if choice is None: terminate(p, signal=SIGINT) raise UnknownQuestionError(line, output) + elif choice: + p.stdin.write(f"{choice}\n") + p.stdin.flush() + else: + terminate(p, signal=SIGINT) + raise UnknownQuestionError(line, output) except KeyboardInterrupt: terminate(p, signal=SIGINT) raise @@ -106,19 +108,17 @@ def back_readline(fp: BinaryIO) -> Iterator[str]: pos = next yield blines.pop(-1).decode('utf-8') -def ask_interactive_question(question: str = "", timeout: int = 60, info: str = "") -> Union[bool, None]: +def ask_interactive_question(question: str = "", timeout: int = 60, info: str = "") -> Union[str, None]: ''' on timeout, returns None ''' - print(f"Please answer this question in {timeout} seconds:\n{question}", end=' ', flush=True) + if info: + print(info) + print(f"Please answer this question in {timeout} seconds:\n{question}", end='', flush=True) while True: read_ready, _, _ = select([stdin], list(), list(), timeout) if not read_ready: return None choice = read_ready[0].readline().strip() - if choice.lower().startswith('y'): - return True - elif choice.lower().startswith('n'): - return False + if choice: + return choice else: - if info and choice.lower().startswith('i'): - print(info) - print(f"Please give an explicit answer [Y]es [N]o{' [I]nfo' if info else ''}", end=' ', flush=True) + print('Please give an explicit answer: ', end='', flush=True)