improve arg parsing, interactive questions

This commit is contained in:
JerryXiao 2021-04-21 18:02:49 +08:00
parent c07da5c301
commit 16263fdc72
Signed by: Jerry
GPG key ID: 22618F758B5BE2E5
4 changed files with 69 additions and 31 deletions

View file

@ -6,9 +6,11 @@
"sync_shell": "sync.sh", "sync_shell": "sync.sh",
"extra_safe": false, "extra_safe": false,
"shell": "/bin/bash", "shell": "/bin/bash",
"save_stdout": true,
"hold": { "hold": {
"linux": "(.*)", "linux": "(.*)",
"python": "[0-9]+[.]([0-9]+)[.][0-9]+[-][0-9]+" "python": "[0-9]+[.]([0-9]+)[.][0-9]+[-][0-9]+",
"pacman": "([0-9]+)[.][0-9]+[.][0-9]+[-][0-9]+"
}, },
"ignored_pacnew": [ "ignored_pacnew": [
"/etc/locale.gen", "/etc/locale.gen",

View file

@ -9,6 +9,7 @@ CONFIG_FILE = 'config.json'
F_KNOWN_OUTPUT_OVERRIDE = 'known_output_override.py' F_KNOWN_OUTPUT_OVERRIDE = 'known_output_override.py'
LIB_DIR = Path('/var/lib/pacroller') LIB_DIR = Path('/var/lib/pacroller')
DB_FILE = 'db' DB_FILE = 'db'
LOG_DIR = Path('/var/log/pacroller')
PACMAN_CONFIG = '/etc/pacman.conf' PACMAN_CONFIG = '/etc/pacman.conf'
PACMAN_LOG = '/var/log/pacman.log' PACMAN_LOG = '/var/log/pacman.log'
PACMAN_PKG_DIR = '/var/cache/pacman/pkg' PACMAN_PKG_DIR = '/var/cache/pacman/pkg'
@ -46,6 +47,7 @@ if CUSTOM_SYNC:
EXTRA_SAFE = bool(_config.get('extra_safe', False)) EXTRA_SAFE = bool(_config.get('extra_safe', False))
SHELL = str(_config.get('shell', '/bin/bash')) SHELL = str(_config.get('shell', '/bin/bash'))
SAVE_STDOUT = bool(_config.get('save_stdout', False))
HOLD = _config.get('hold', dict()) HOLD = _config.get('hold', dict())
for (k, v) in HOLD.items(): for (k, v) in HOLD.items():

View file

@ -7,15 +7,16 @@ from re import match
import json import json
from os import environ, getuid, isatty from os import environ, getuid, isatty
import traceback import traceback
from datetime import datetime
import pyalpm import pyalpm
import pycman import pycman
from typing import List, Iterator from typing import List, Iterator
from pacroller.utils import execute_with_io, UnknownQuestionError, back_readline from pacroller.utils import execute_with_io, UnknownQuestionError, back_readline, ask_interactive_question
from pacroller.checker import log_checker, sync_err_is_net, upgrade_err_is_net, checkReport from pacroller.checker import log_checker, sync_err_is_net, upgrade_err_is_net, checkReport
from pacroller.config import (CONFIG_DIR, CONFIG_FILE, LIB_DIR, DB_FILE, PACMAN_LOG, PACMAN_CONFIG, from pacroller.config import (CONFIG_DIR, CONFIG_FILE, LIB_DIR, DB_FILE, PACMAN_LOG, PACMAN_CONFIG,
TIMEOUT, UPGRADE_TIMEOUT, NETWORK_RETRY, CUSTOM_SYNC, SYNC_SH, TIMEOUT, UPGRADE_TIMEOUT, NETWORK_RETRY, CUSTOM_SYNC, SYNC_SH,
EXTRA_SAFE, SHELL, HOLD, NEEDRESTART, NEEDRESTART_CMD, SYSTEMD, EXTRA_SAFE, SHELL, HOLD, NEEDRESTART, NEEDRESTART_CMD, SYSTEMD,
PACMAN_PKG_DIR, PACMAN_SCC, PACMAN_DB_LCK) PACMAN_PKG_DIR, PACMAN_SCC, PACMAN_DB_LCK, SAVE_STDOUT, LOG_DIR)
logger = logging.getLogger() logger = logging.getLogger()
@ -89,6 +90,7 @@ def upgrade(interactive=False) -> List[str]:
exit(0) exit(0)
else: else:
def examine_upgrade(toadd: List[pyalpm.Package], toremove: List[pyalpm.Package]) -> None: def examine_upgrade(toadd: List[pyalpm.Package], toremove: List[pyalpm.Package]) -> None:
errors = list()
for pkg in toadd: for pkg in toadd:
localpkg: pyalpm.Package = localdb.get_pkg(pkg.name) localpkg: pyalpm.Package = localdb.get_pkg(pkg.name)
localver = localpkg.version if localpkg else "" localver = localpkg.version if localpkg else ""
@ -98,14 +100,25 @@ def upgrade(interactive=False) -> List[str]:
_m_new = match(_testreg, pkg.version) _m_new = match(_testreg, pkg.version)
if _m_old and _m_new: if _m_old and _m_new:
if (o := _m_old.groups()) != (n := _m_new.groups()): if (o := _m_old.groups()) != (n := _m_new.groups()):
raise PackageHold(f"hold package {pkg.name} is going to be upgraded from {o=} to {n=}") errors.append(f"hold package {pkg.name} is going to be upgraded from {o=} to {n=}")
else: else:
raise PackageHold(f"cannot match version regex for hold package {pkg.name}") errors.append(f"cannot match version regex for hold package {pkg.name}")
for pkg in toremove: for pkg in toremove:
logger.debug(f"will remove {pkg.name} version {pkg.version}") logger.debug(f"will remove {pkg.name} version {pkg.version}")
if pkg.name in HOLD: if pkg.name in HOLD:
raise PackageHold(f"attempt to remove {pkg.name} which is set to hold") errors.append(f"attempt to remove {pkg.name} which is set to hold")
examine_upgrade(t.to_add, t.to_remove) if errors:
raise PackageHold(errors)
try:
examine_upgrade(t.to_add, t.to_remove)
except Exception as e:
if interactive:
if ask_interactive_question(f"{e}, continue?"):
logger.warning("user determined to continue")
else:
raise
else:
raise
finally: finally:
t.release() t.release()
pacman_output = execute_with_io(['pacman', '-Su', '--noprogressbar', '--color', 'never'], UPGRADE_TIMEOUT, interactive=interactive) pacman_output = execute_with_io(['pacman', '-Su', '--noprogressbar', '--color', 'never'], UPGRADE_TIMEOUT, interactive=interactive)
@ -138,6 +151,15 @@ def do_system_upgrade(debug=False, interactive=False) -> checkReport:
else: else:
raise MaxRetryReached(f'upgrade failed {NETWORK_RETRY} times') raise MaxRetryReached(f'upgrade failed {NETWORK_RETRY} times')
if SAVE_STDOUT:
filename = datetime.now().astimezone().isoformat(timespec='seconds').replace(':', '_') + ".log"
logger.debug(f"saving stdout to {filename}")
try:
LOG_DIR.mkdir(parents=True, exist_ok=True)
(LOG_DIR / filename).write_text("\n".join(stdout))
except Exception:
logger.warning(f"unable to save stdout to {filename}\n{traceback.format_exc()}")
with open(PACMAN_LOG, 'r') as pacman_log: with open(PACMAN_LOG, 'r') as pacman_log:
pacman_log.seek(log_anchor) pacman_log.seek(log_anchor)
log = pacman_log.read().split('\n') log = pacman_log.read().split('\n')
@ -145,6 +167,7 @@ def do_system_upgrade(debug=False, interactive=False) -> checkReport:
report = log_checker(stdout, log, debug=debug) report = log_checker(stdout, log, debug=debug)
except Exception: except Exception:
logger.exception('checker has crashed, here is the debug info') logger.exception('checker has crashed, here is the debug info')
logger.setLevel(logging.DEBUG)
_report = log_checker(stdout, log, debug=True) _report = log_checker(stdout, log, debug=True)
raise raise
@ -235,12 +258,15 @@ def main() -> None:
else: else:
logger.debug(f'needrestart {p.stdout=}') logger.debug(f'needrestart {p.stdout=}')
import argparse import argparse
parser = argparse.ArgumentParser(description='Pacman Automatic Rolling Helper') parser = argparse.ArgumentParser(description='Unattended Upgrades for Arch Linux')
parser.add_argument('action', choices=['run', 'status', 'fail-reset']) parser.add_argument('action', choices=['run', 'status', 'reset', 'fail-reset', 'reset-failed'],
help="what to do", metavar="run / status / reset ")
parser.add_argument('-d', '--debug', action='store_true', help='enable debug mode') parser.add_argument('-d', '--debug', action='store_true', help='enable debug mode')
parser.add_argument('-v', '--verbose', action='store_true', help='show verbose report') parser.add_argument('-v', '--verbose', action='store_true', help='show verbose report')
parser.add_argument('-m', '--max', type=int, default=1, help='Number of upgrades to show') parser.add_argument('-m', '--max', type=int, default=1, help='Number of upgrades to show')
parser.add_argument('-i', '--interactive', choices=['auto', 'on', 'off'], default='auto', help='allow interactive questions') parser.add_argument('-i', '--interactive', choices=['auto', 'on', 'off'],
default='auto', help='allow interactive questions',
metavar="auto / on / off ")
args = parser.parse_args() args = parser.parse_args()
if args.debug: if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s')
@ -301,7 +327,7 @@ def main() -> None:
print() print()
if failed: if failed:
exit(2) exit(2)
elif args.action == 'fail-reset': elif args.action in {'reset', 'fail-reset', 'reset-failed'}:
if getuid() != 0: if getuid() != 0:
logger.error('you need to be root') logger.error('you need to be root')
exit(1) exit(1)

View file

@ -1,7 +1,7 @@
import subprocess import subprocess
from threading import Thread from threading import Thread
import logging import logging
from typing import List, BinaryIO, Iterator from typing import List, BinaryIO, Iterator, Union
from io import DEFAULT_BUFFER_SIZE from io import DEFAULT_BUFFER_SIZE
from time import mktime from time import mktime
from datetime import datetime from datetime import datetime
@ -60,25 +60,16 @@ def execute_with_io(command: List[str], timeout: int = 3600, interactive: bool =
p.stdin.flush() p.stdin.flush()
elif line.lower().endswith('[y/n]'): elif line.lower().endswith('[y/n]'):
if interactive: if interactive:
print(f"Please answer this question in 60 seconds:\n{line}", end=' ', flush=True) choice = ask_interactive_question(line, info=output)
while True: if choice is None:
read_ready, _, _ = select([stdin], list(), list(), 60) terminate(p, signal=SIGINT)
if not read_ready: raise UnknownQuestionError(line, output)
terminate(p, signal=SIGINT) elif choice:
raise UnknownQuestionError(line, output) p.stdin.write('y\n')
choice = read_ready[0].readline().strip() p.stdin.flush()
if choice.lower().startswith('y'): else:
p.stdin.write('y\n') p.stdin.write('n\n')
p.stdin.flush() p.stdin.flush()
break
elif choice.lower().startswith('n'):
p.stdin.write('n\n')
p.stdin.flush()
break
else:
if choice.lower().startswith('s'):
print(output)
print("Please give an explicit answer [Y]es [N]o [S]how", end=' ', flush=True)
else: else:
terminate(p, signal=SIGINT) terminate(p, signal=SIGINT)
raise UnknownQuestionError(line, output) raise UnknownQuestionError(line, output)
@ -113,3 +104,20 @@ def back_readline(fp: BinaryIO) -> Iterator[str]:
previous = blines[0] previous = blines[0]
pos = next pos = next
yield blines.pop(-1).decode('utf-8') yield blines.pop(-1).decode('utf-8')
def ask_interactive_question(question: str = "", timeout: int = 60, info: str = "") -> Union[bool, None]:
''' on timeout, returns None '''
print(f"Please answer this question in {timeout} seconds:\n{question}", end=' ', flush=True)
while True:
read_ready, _, _ = select([stdin], list(), list(), timeout)
if not read_ready:
return None
choice = read_ready[0].readline().strip()
if choice.lower().startswith('y'):
return True
elif choice.lower().startswith('n'):
return False
else:
if info and choice.lower().startswith('i'):
print(info)
print(f"Please give an explicit answer [Y]es [N]o{' [I]nfo' if info else ''}", end=' ', flush=True)