mirror of
https://github.com/isjerryxiao/pacroller.git
synced 2024-11-15 04:42:24 +08:00
pacman may ask for a number
This commit is contained in:
parent
792bab23bd
commit
2fdb21e2cc
2 changed files with 29 additions and 28 deletions
|
@ -131,7 +131,8 @@ def upgrade(interactive=False) -> List[str]:
|
||||||
raise PackageHold(errors)
|
raise PackageHold(errors)
|
||||||
except PackageHold as e:
|
except PackageHold as e:
|
||||||
if interactive:
|
if interactive:
|
||||||
if ask_interactive_question(f"{e}, continue?"):
|
user_input = ask_interactive_question(f"{e}, continue? ")
|
||||||
|
if user_input and user_input.lower().startswith('y'):
|
||||||
logger.warning("user determined to continue")
|
logger.warning("user determined to continue")
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -8,6 +8,7 @@ from datetime import datetime
|
||||||
from signal import SIGINT, SIGTERM, Signals
|
from signal import SIGINT, SIGTERM, Signals
|
||||||
from select import select
|
from select import select
|
||||||
from sys import stdin
|
from sys import stdin
|
||||||
|
from os import environ
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
class UnknownQuestionError(subprocess.SubprocessError):
|
class UnknownQuestionError(subprocess.SubprocessError):
|
||||||
|
@ -37,12 +38,17 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool =
|
||||||
terminate(p)
|
terminate(p)
|
||||||
else:
|
else:
|
||||||
logger.debug('set_timeout exit')
|
logger.debug('set_timeout exit')
|
||||||
|
linebuf_env = dict(environ)
|
||||||
|
linebuf_env['_STDBUF_O'] = 'L'
|
||||||
|
linebuf_env['_STDBUF_E'] = 'L'
|
||||||
|
linebuf_env['LD_PRELOAD'] = '/usr/lib/coreutils/libstdbuf.so'
|
||||||
p = subprocess.Popen(
|
p = subprocess.Popen(
|
||||||
command,
|
command,
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
encoding='utf-8'
|
encoding='utf-8',
|
||||||
|
env=linebuf_env
|
||||||
)
|
)
|
||||||
logger.debug(f"running {command}")
|
logger.debug(f"running {command}")
|
||||||
try:
|
try:
|
||||||
|
@ -55,25 +61,21 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool =
|
||||||
if r == '\n':
|
if r == '\n':
|
||||||
logger.debug('STDOUT: %s', line[:-1])
|
logger.debug('STDOUT: %s', line[:-1])
|
||||||
line = ''
|
line = ''
|
||||||
elif r == ']':
|
if line == ':: Proceed with installation? [Y/n]':
|
||||||
if line == ':: Proceed with installation? [Y/n]':
|
p.stdin.write('y\n')
|
||||||
p.stdin.write('y\n')
|
p.stdin.flush()
|
||||||
p.stdin.flush()
|
elif line.lower().endswith('[y/n]') or line == 'Enter a number (default=1): ':
|
||||||
elif line.lower().endswith('[y/n]'):
|
if interactive:
|
||||||
if interactive:
|
choice = ask_interactive_question(line, info=output)
|
||||||
choice = ask_interactive_question(line, info=output)
|
if choice is None:
|
||||||
if choice is None:
|
|
||||||
terminate(p, signal=SIGINT)
|
|
||||||
raise UnknownQuestionError(line, output)
|
|
||||||
elif choice:
|
|
||||||
p.stdin.write('y\n')
|
|
||||||
p.stdin.flush()
|
|
||||||
else:
|
|
||||||
p.stdin.write('n\n')
|
|
||||||
p.stdin.flush()
|
|
||||||
else:
|
|
||||||
terminate(p, signal=SIGINT)
|
terminate(p, signal=SIGINT)
|
||||||
raise UnknownQuestionError(line, output)
|
raise UnknownQuestionError(line, output)
|
||||||
|
elif choice:
|
||||||
|
p.stdin.write(f"{choice}\n")
|
||||||
|
p.stdin.flush()
|
||||||
|
else:
|
||||||
|
terminate(p, signal=SIGINT)
|
||||||
|
raise UnknownQuestionError(line, output)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
terminate(p, signal=SIGINT)
|
terminate(p, signal=SIGINT)
|
||||||
raise
|
raise
|
||||||
|
@ -106,19 +108,17 @@ def back_readline(fp: BinaryIO) -> Iterator[str]:
|
||||||
pos = next
|
pos = next
|
||||||
yield blines.pop(-1).decode('utf-8')
|
yield blines.pop(-1).decode('utf-8')
|
||||||
|
|
||||||
def ask_interactive_question(question: str = "", timeout: int = 60, info: str = "") -> Union[bool, None]:
|
def ask_interactive_question(question: str = "", timeout: int = 60, info: str = "") -> Union[str, None]:
|
||||||
''' on timeout, returns None '''
|
''' on timeout, returns None '''
|
||||||
print(f"Please answer this question in {timeout} seconds:\n{question}", end=' ', flush=True)
|
if info:
|
||||||
|
print(info)
|
||||||
|
print(f"Please answer this question in {timeout} seconds:\n{question}", end='', flush=True)
|
||||||
while True:
|
while True:
|
||||||
read_ready, _, _ = select([stdin], list(), list(), timeout)
|
read_ready, _, _ = select([stdin], list(), list(), timeout)
|
||||||
if not read_ready:
|
if not read_ready:
|
||||||
return None
|
return None
|
||||||
choice = read_ready[0].readline().strip()
|
choice = read_ready[0].readline().strip()
|
||||||
if choice.lower().startswith('y'):
|
if choice:
|
||||||
return True
|
return choice
|
||||||
elif choice.lower().startswith('n'):
|
|
||||||
return False
|
|
||||||
else:
|
else:
|
||||||
if info and choice.lower().startswith('i'):
|
print('Please give an explicit answer: ', end='', flush=True)
|
||||||
print(info)
|
|
||||||
print(f"Please give an explicit answer [Y]es [N]o{' [I]nfo' if info else ''}", end=' ', flush=True)
|
|
||||||
|
|
Loading…
Reference in a new issue