repod: rewrite push

This commit is contained in:
JerryXiao 2019-09-06 14:59:45 +08:00
parent 0518ebf199
commit 96e76dfc40
Signed by: Jerry
GPG Key ID: 9D9CE43650FF2BAA
2 changed files with 144 additions and 82 deletions

View File

@ -87,7 +87,7 @@ class jobsManager:
ret += f'{myproperty}={getattr(self, myproperty, None)},' ret += f'{myproperty}={getattr(self, myproperty, None)},'
ret += ')' ret += ')'
return ret return ret
def reset_dir(self, pkgdirname=None, all=False): def reset_dir(self, pkgdirname=None, all=False, rmpkg=True):
if all: if all:
logger.info('resetting %s', str(REPO_ROOT)) logger.info('resetting %s', str(REPO_ROOT))
bash(GIT_RESET_SUBDIR, cwd=REPO_ROOT) bash(GIT_RESET_SUBDIR, cwd=REPO_ROOT)
@ -102,16 +102,36 @@ class jobsManager:
if fpath.is_dir() and \ if fpath.is_dir() and \
fpath.name in ('pkg', 'src'): fpath.name in ('pkg', 'src'):
rmtree(fpath) rmtree(fpath)
elif fpath.is_file() and \ elif rmpkg and fpath.is_file() and \
(fpath.name.endswith(PKG_SUFFIX) or \ (fpath.name.endswith(PKG_SUFFIX) or \
fpath.name.endswith(PKG_SIG_SUFFIX)): fpath.name.endswith(PKG_SIG_SUFFIX)):
fpath.unlink() fpath.unlink()
else: else:
return False return False
return True return True
def force_upload_package(self, pkgdirname, overwrite=True): def force_upload_package(self, pkgdirname, overwrite=False):
pass if not self.idle:
def rebuild_package(self, pkgdirname, clean=False): logger.debug('force_upload requested and not idle.')
if not (REPO_ROOT / pkgdirname).exists():
ret = f'force_upload failed: no such dir {pkgdirname}'
logger.warning(ret)
else:
self.pkgconfigs = load_all_yaml()
updates = updmgr.check_update(rebuild_package=pkgdirname)
if updates and len(updates) == 1:
(pkgconfig, ver, buildarchs) = updates[0]
fakejob = Job(buildarchs[0], pkgconfig, ver)
if self.__upload(fakejob, overwrite=overwrite):
ret = f'done force_upload {pkgdirname}'
logger.info(ret)
else:
ret = f'force_upload {pkgdirname} failed: return code.'
logger.warning(ret)
else:
ret = f'force_upload {pkgdirname} failed: cannot check update.'
logger.warning(ret)
return ret
def rebuild_package(self, pkgdirname, clean=True):
if not self.idle: if not self.idle:
logger.debug('rebuild requested and not idle.') logger.debug('rebuild requested and not idle.')
self.pkgconfigs = load_all_yaml() self.pkgconfigs = load_all_yaml()
@ -130,7 +150,7 @@ class jobsManager:
ret = f'rebuild job added for {pkgdirname} {" ".join(buildarchs)}' ret = f'rebuild job added for {pkgdirname} {" ".join(buildarchs)}'
logger.info(ret) logger.info(ret)
else: else:
ret = 'rebuild failed: cannot check update.' ret = f'rebuild {pkgdirname} failed: cannot check update.'
logger.warning(ret) logger.warning(ret)
return ret return ret
def _new_buildjob(self, job): def _new_buildjob(self, job):
@ -228,10 +248,10 @@ class jobsManager:
for fpath in cwd.iterdir(): for fpath in cwd.iterdir():
if fpath.name.endswith(PKG_SUFFIX): if fpath.name.endswith(PKG_SUFFIX):
bash(f'{GPG_SIGN_CMD} {fpath.name}', cwd=cwd) bash(f'{GPG_SIGN_CMD} {fpath.name}', cwd=cwd)
def __upload(self, job): def __upload(self, job, overwrite=False):
suc = True
cwd = REPO_ROOT / job.pkgconfig.dirname cwd = REPO_ROOT / job.pkgconfig.dirname
f_to_upload = list() f_to_upload = list()
pkg_update_list = list()
for fpath in cwd.iterdir(): for fpath in cwd.iterdir():
if fpath.name.endswith(PKG_SUFFIX) and \ if fpath.name.endswith(PKG_SUFFIX) and \
get_pkg_details_from_name(fpath.name).ver == job.version: get_pkg_details_from_name(fpath.name).ver == job.version:
@ -239,36 +259,33 @@ class jobsManager:
assert sigpath.exists() assert sigpath.exists()
f_to_upload.append(sigpath) f_to_upload.append(sigpath)
f_to_upload.append(fpath) f_to_upload.append(fpath)
pkg_update_list.append(fpath)
sizes = [f.stat().st_size / 1000 / 1000 for f in f_to_upload]
max_tries = 10
for tries in range(max_tries):
timeouts = rrun('push_start', args=(f_to_upload, sizes))
if type(timeouts) is list:
break
else:
if tries + 1 < max_tries:
logger.warning(f'Remote is busy ({timeouts}), wait 1 min x10 [{tries+1}/10]')
sleep(60)
else:
raise RuntimeError('Remote is busy and cannot connect')
assert len(f_to_upload) == len(timeouts)
pkgs_timeouts = {f_to_upload[i]:timeouts[i] for i in range(len(sizes))}
for f in f_to_upload: for f in f_to_upload:
max_tries = 5 max_tries = 5
for tries in range(max_tries): for tries in range(max_tries):
try: try:
size = f.stat().st_size / 1000 / 1000 timeout = pkgs_timeouts.get(f)
if f.name.endswith(PKG_SUFFIX):
for _ in range(10):
timeout = rrun('push_start', args=(f.name, size))
if timeout > 0:
break
else:
logger.warning('Remote is busy (-1), wait 1 min x10')
sleep(60)
else:
timeout = 60
logger.info(f'Uploading {f}, timeout in {timeout}s') logger.info(f'Uploading {f}, timeout in {timeout}s')
mon_bash(UPLOAD_CMD.format(src=f), seconds=int(timeout)) mon_bash(UPLOAD_CMD.format(src=f), seconds=int(timeout))
if f.name.endswith(PKG_SUFFIX):
logger.info(f'Requesting repo update for {f.name}')
res = rrun('push_done', args=(f.name,), kwargs={'overwrite': False,})
if res is None:
logger.info(f'Update success for {f.name}')
else:
logger.error(f'Update failed for {f.name}, reason: {res}')
suc = False
except Exception: except Exception:
time_to_sleep = (tries + 1) * 60 time_to_sleep = (tries + 1) * 60
logger.error(f'We are getting problem uploading {f}, wait {time_to_sleep} secs') logger.error(f'We are getting problem uploading {f}, wait {time_to_sleep} secs')
if not rrun('push_fail', args=(f.name,)): if not rrun('push_fail', args=(f.name,)):
logger.error('unable to run push_fail') logger.error('Unable to run push_fail')
print_exc_plus() print_exc_plus()
if tries + 1 < max_tries: if tries + 1 < max_tries:
sleep(time_to_sleep) sleep(time_to_sleep)
@ -277,7 +294,31 @@ class jobsManager:
else: else:
logger.error(f'Upload {f} failed, abort.') logger.error(f'Upload {f} failed, abort.')
raise RuntimeError('Unable to upload some files') raise RuntimeError('Unable to upload some files')
return suc logger.info(f'Requesting repo update for {pkg_update_list}')
res = "unexpected"
max_tries = 5
for tries in range(max_tries):
try:
res = rrun('push_done', args=(f_to_upload,), kwargs={'overwrite': overwrite,})
except Exception:
time_to_sleep = (tries + 1) * 60
logger.info(f'Error updating {pkg_update_list}, wait {time_to_sleep} secs')
print_exc_plus()
if tries + 1 < max_tries:
sleep(time_to_sleep)
else:
break
else:
ret = f'Update failed for {pkg_update_list}: max reties exceeded'
logger.error(ret)
raise RuntimeError(ret)
if res is None:
logger.info(f'Update success for {pkg_update_list}')
else:
ret = f'Update failed for {pkg_update_list}, reason: {res}'
logger.error(ret)
raise RuntimeError(ret)
return res is None
def getup(self): def getup(self):
''' '''
check for updates now !!! check for updates now !!!
@ -396,7 +437,8 @@ class updateManager:
logger.info(f'checking update: {pkg.dirname}') logger.info(f'checking update: {pkg.dirname}')
if self.__pkgerrs.get(pkg.dirname, 0) >= 2: if self.__pkgerrs.get(pkg.dirname, 0) >= 2:
logger.warning(f'package: {pkg.dirname} too many failures checking update') logger.warning(f'package: {pkg.dirname} too many failures checking update')
continue if rebuild_package is None:
continue
pkgbuild = pkgdir / 'PKGBUILD' pkgbuild = pkgdir / 'PKGBUILD'
archs = get_arch_from_pkgbuild(pkgbuild) archs = get_arch_from_pkgbuild(pkgbuild)
buildarchs = [BUILD_ARCH_MAPPING.get(arch, None) for arch in archs] buildarchs = [BUILD_ARCH_MAPPING.get(arch, None) for arch in archs]

124
repod.py
View File

@ -13,6 +13,7 @@ import os
from config import REPOD_BIND_ADDRESS, REPOD_BIND_PASSWD, REPO_PUSH_BANDWIDTH, \ from config import REPOD_BIND_ADDRESS, REPOD_BIND_PASSWD, REPO_PUSH_BANDWIDTH, \
GPG_VERIFY_CMD GPG_VERIFY_CMD
from shared_vars import PKG_SUFFIX, PKG_SIG_SUFFIX
@ -32,27 +33,34 @@ configure_logger(logger, logfile='repod.log', rotate_size=1024*1024*10, enable_n
class pushFm: class pushFm:
def __init__(self): def __init__(self):
self.fname = None self.fnames = list()
self.size = None self.size = None
self.sizes = None
self.start_time = None self.start_time = None
self.end_time = None self.end_time = None
def start(self, fname, size): def start(self, fnames, sizes):
''' '''
size is in MB sizes is list in MB
returns -1 when busy returns -1 when busy
''' '''
if self.is_busy(): if self.is_busy():
return -1 return -1
self.fname = fname self.fnames = fnames
self.start_time = time() self.start_time = time()
self.sizes = sizes
size = 0
for s in sizes:
size += s
self.size = size self.size = size
if size <= 7.5: def get_timeout(size):
timeout = 120 if size <= 7.5:
self.end_time = self.start_time + 120 timeout = 120
else: else:
timeout = size / (REPO_PUSH_BANDWIDTH / 8) * 2 timeout = size / (REPO_PUSH_BANDWIDTH / 8) * 2
self.end_time = self.start_time + timeout return timeout
return timeout timeouts = [get_timeout(s) for s in sizes]
self.end_time = self.start_time + get_timeout(self.size)
return timeouts
def tick(self): def tick(self):
''' '''
return None means success return None means success
@ -60,77 +68,89 @@ class pushFm:
''' '''
if self.is_busy(): if self.is_busy():
if time() > self.end_time: if time() > self.end_time:
ret = f'file {self.fname} is supposed to finish at {self.end_time}' ret = f'files {self.fnames} are supposed to finish at {self.end_time}'
self.__init__() self.__init__()
logger.error(f'pfm: {ret}') logger.error(f'tick: {ret}')
return ret return ret
else: else:
return None return None
else: else:
return None return None
def fail(self, fname): def fail(self, tfname):
update_path = Path('updates') update_path = Path('updates')
if fname == self.fname: if tfname in self.fnames:
pkg = update_path / self.fname for fname in self.fnames:
sig = update_path / f'{self.fname}.sig' pkg = update_path / fname
for f in (pkg, sig): sig = update_path / f'{fname}.sig'
if f.exists(): for f in (pkg, sig):
try: if f.exists():
f.unlink() try:
except Exception: f.unlink()
logger.warning(f'unable to remove {f.name}') except Exception:
logger.warning(f'unable to remove {f.name}')
self.__init__() self.__init__()
return None return None
else: else:
return "Wrong file" return "Wrong file"
def done(self, fname, overwrite=False): def done(self, fnames, overwrite=False):
''' '''
return None means success return None means success
else returns an error string else returns an error string
''' '''
if fname == self.fname: if [f for f in fnames if not (f.endswith(PKG_SUFFIX) or f.endswith(PKG_SIG_SUFFIX))]:
return "file to upload are garbage"
filter_sig = lambda fnames:[fname for fname in fnames if not fname.endswith(PKG_SIG_SUFFIX)]
if sorted(filter_sig(fnames)) == sorted(filter_sig(self.fnames)):
try: try:
update_path = Path('updates') update_path = Path('updates')
pkg_found = False for pkgfname in filter_sig(fnames):
sig_found = False pkg_found = False
for fpath in update_path.iterdir(): sig_found = False
if fpath.is_dir(): for fpath in update_path.iterdir():
continue if fpath.is_dir():
if fpath.name == self.fname: continue
pkg_found = fpath if fpath.name == pkgfname:
elif fpath.name == f'{self.fname}.sig': pkg_found = fpath
sig_found = fpath elif fpath.name == f'{pkgfname}.sig':
if pkg_found and sig_found: sig_found = fpath
try: if pkg_found and sig_found:
bash(f'{GPG_VERIFY_CMD} {sig_found} {pkg_found}')
except CalledProcessError:
print_exc_plus()
return 'GPG verify error'
else:
try: try:
if update(overwrite=overwrite): bash(f'{GPG_VERIFY_CMD} {sig_found} {pkg_found}')
return None except CalledProcessError:
except Exception: ret = f'{pkg_found} GPG verify error'
logger.error(ret)
print_exc_plus() print_exc_plus()
return 'update error' return ret
else:
try:
if update(overwrite=overwrite):
continue
else:
raise RuntimeError('update return false')
except Exception:
print_exc_plus()
return f'{pkg_found} update error'
else:
return f'file missing: pkg {pkg_found} sig {sig_found}'
return "unexpected error"
else: else:
return f'file missing: pkg {pkg_found} sig {sig_found}' # success
return "unexpected error" return None
finally: finally:
self.__init__() self.__init__()
else: else:
return "Wrong file" return "Wrong file"
def is_busy(self): def is_busy(self):
return not (self.fname is None) return bool(self.fnames)
pfm = pushFm() pfm = pushFm()
def push_start(filename, size): def push_start(filenames, sizes):
pfm.tick() pfm.tick()
return pfm.start(filename, size) return pfm.start(filenames, sizes)
def push_done(filename, overwrite=False): def push_done(filenames, overwrite=False):
return pfm.done(filename, overwrite=overwrite) return pfm.done(filenames, overwrite=overwrite)
def push_fail(filename): def push_fail(filename):
return pfm.fail(filename) return pfm.fail(filename)