#!/usr/bin/python3 # -*- mode: python; coding: utf-8 -*- # Miniature version of "dinstall", for installing .changes # into an archive # Copyright (c) 2002, 2003 Colin Walters # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os, sys, re, glob, getopt, time, traceback, lzma, getpass, socket import shutil, threading, select, queue, socketserver, datetime, subprocess import logging, logging.handlers import apt_pkg apt_pkg.init() from configparser import * from minidinstall.ChangeFile import * from minidinstall.Dnotify import * from minidinstall.DebianSigVerifier import * from minidinstall.GPGSigVerifier import * from minidinstall.version import * from minidinstall import misc, mail, tweet debchanges_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.changes$') debpackage_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.(d|u)?deb$') debbuildinfo_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.buildinfo$') debsrc_dsc_re = re.compile('([-a-z0-9+.]+)_(.+?)\.dsc$') debsrc_diff_re = re.compile('([-a-z0-9+.]+)_(.+?)\.diff\.gz$') debsrc_orig_re = re.compile('([-a-z0-9+.]+)_(.+?)\.orig[-a-z0-9]*\.tar\.(gz|bz2|lzma|xz)(\.asc)?$') debsrc_native_re = re.compile('([-a-z0-9+.]+)_(.+?)\.tar\.(gz|bz2|lzma|xz)$') native_version_re = re.compile('\s*.*-') toplevel_directory = None tmp_new_suffix = '.dinstall-new' tmp_old_suffix = '.dinstall-old' dinstall_subdir = 'mini-dinstall' incoming_subdir = 'incoming' socket_name = 'master' logfile_name = 'mini-dinstall.log' configfile_names = ['/etc/mini-dinstall.conf', '~/.mini-dinstall.conf'] use_dnotify = False mail_on_success = True tweet_on_success = False default_poll_time = 30 default_max_retry_time = 60 * 60 * 24 * 2 default_mail_log_level = logging.ERROR trigger_reindex = True mail_log_flush_level = logging.ERROR mail_log_flush_count = 10 mail_to = getpass.getuser() mail_server = 'localhost' incoming_permissions = 0o750 tweet_server = 'identica' tweet_user = None tweet_password = None default_architectures = ["all", "i386"] default_distributions = ["unstable"] distributions = {} hashes = ['sha256'] scantime = 60 mail_subject_template = "mini-dinstall: Successfully installed %(source)s %(version)s to %(distribution)s" mail_body_template = """ Package: %(source)s Maintainer: %(maintainer)s Changed-By: %(changed-by)s Changes: %(changes_without_dot)s """.lstrip() tweet_template = "Installed %(source)s %(version)s to %(distribution)s" def usage(ecode, ver_only=False): print("mini-dinstall", pkg_version) if ver_only: sys.exit(ecode) print(""" Copyright (c) 2002 Colin Walters Licensed under the GNU GPL. Usage: mini-dinstall [OPTIONS...] [DIRECTORY] Options: -v, --verbose Display extra information -q, --quiet Display less information -c, --config=FILE Parse configuration info from FILE -d, --debug Output information to stdout as well as log -f, --foreground Run daemon in foreground --no-log Don't write information to log file -n, --no-act Don't actually perform changes -b, --batch Don't daemonize; run once, then exit -r, --run Process queue immediately -k, --kill Kill the running mini-dinstall --no-db Disable lookups on package database --help What you're looking at --version Print the software version and exit """.strip()) sys.exit(ecode) try: (opts, args) = getopt.getopt(sys.argv[1:], 'vqc:dfnbrk', ['verbose', 'quiet', 'config=', 'debug', 'foreground', 'no-log', 'no-act', 'batch', 'run', 'kill', 'no-db', 'help', 'version']) except getopt.GetoptError as e: sys.stderr.write("Error reading arguments: %s\n" % e) usage(1) for (key, value) in opts: if key == '--help': usage(0) elif key == '--version': usage(0, ver_only=True) if len(args) > 1: sys.stderr.write("Unknown arguments: %s\n" % args[1:]) usage(1) # don't propagate exceptions that happen while logging logging.raiseExceptions = False logger = logging.getLogger("mini-dinstall") loglevel = logging.WARN no_act = False debug_mode = False run_mode = False kill_mode = False nodb_mode = False no_log = False foreground_mode = False batch_mode = False custom_config_files = False for (key, value) in opts: if key in ('-v', '--verbose'): if loglevel == logging.INFO: loglevel = logging.DEBUG elif loglevel == logging.WARN: loglevel = logging.INFO elif key in ('-q', '--quiet'): if loglevel == logging.WARN: loglevel = logging.ERROR elif loglevel == logging.WARN: loglevel = logging.CRITICAL elif key in ('-c', '--config'): if not custom_config_files: custom_config_files = True configfile_names = [] configfile_names.append(os.path.abspath(os.path.expanduser(value))) elif key in ('-n', '--no-act'): no_act = True elif key in ('-d', '--debug'): debug_mode = True elif key in ('--no-log',): no_log = True elif key in ('-b', '--batch'): batch_mode = True elif key in ('-f', '--foreground'): foreground_mode = True elif key in ('-r', '--run'): run_mode = True elif key in ('-k', '--kill'): kill_mode = True elif key in ('--no-db',): nodb_mode = True def do_and_log(msg, function, *args): try: logger.debug(msg) except: pass if not no_act: function(*args) def do_mkdir(name): if os.access(name, os.X_OK): return try: do_and_log('Creating directory "%s"' % name, os.mkdir, name) except OSError as e: print(e) exit(1) def do_rename(source, target): do_and_log('Renaming "%s" to "%s"' % (source, target), os.rename, source, target) def do_chmod(name, mode): if mode == 0: return do_and_log('Changing mode of "%s" to %o' % (name, mode), os.chmod, name, mode) logger.setLevel(logging.DEBUG) stderr_handler = logging.StreamHandler() stderr_handler.setLevel(loglevel) logger.addHandler(stderr_handler) stderr_handler.setLevel(loglevel) stderr_handler.setFormatter(logging.Formatter(fmt="%(name)s [%(thread)d] %(levelname)s: %(message)s")) configp = ConfigParser() configfile_names = [os.path.abspath(os.path.expanduser(x)) for x in configfile_names] logger.debug("Reading config files: %s" % configfile_names) configp.read(configfile_names) class SubjectSpecifyingLoggingSMTPHandler(logging.handlers.SMTPHandler): def __init__(self, *args, **kwargs): logging.handlers.SMTPHandler.__init__(*[self] + list(args) + ['dummy'], **kwargs) def setSubject(self, subject): self._subject = subject def getSubject(self, record): return re.sub('%l', record.levelname, self._subject) if not (configp.has_option('DEFAULT', 'mail_log_level') and configp.get('DEFAULT', 'mail_log_level') == 'NONE'): if configp.has_option('DEFAULT', 'mail_log_level'): mail_log_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_level')] else: mail_log_level = default_mail_log_level if configp.has_option('DEFAULT', 'mail_to'): mail_to = configp.get('DEFAULT', 'mail_to') if configp.has_option('DEFAULT', 'mail_server'): mail_server = configp.get('DEFAULT', 'mail_server') if configp.has_option('DEFAULT', 'mail_log_flush_count'): mail_log_flush_count = configp.getint('DEFAULT', 'mail_log_flush_count') if configp.has_option('DEFAULT', 'mail_log_flush_level'): mail_log_flush_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_flush_level')] mail_smtp_handler = SubjectSpecifyingLoggingSMTPHandler(mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(), socket.getfqdn()), [mail_to]) mail_smtp_handler.setSubject('mini-dinstall log notice (%l)') mail_handler = logging.handlers.MemoryHandler(mail_log_flush_count, flushLevel=mail_log_flush_level, target=mail_smtp_handler) mail_handler.setLevel(mail_log_level) logger.addHandler(mail_handler) if args: toplevel_directory = args[0] elif configp.has_option('DEFAULT', 'archivedir'): toplevel_directory = os.path.expanduser(configp.get('DEFAULT', 'archivedir')) else: logger.error("No archivedir specified on command line or in config files.") sys.exit(1) if configp.has_option('DEFAULT', 'incoming_permissions'): incoming_permissions = configp.getint('DEFAULT', 'incoming_permissions') do_mkdir(toplevel_directory) dinstall_subdir = os.path.join(toplevel_directory, dinstall_subdir) do_mkdir(dinstall_subdir) lockfilename = os.path.join(dinstall_subdir, 'mini-dinstall.lock') def process_exists(pid): try: os.kill(pid, 0) except OSError as e: return False return True if os.access(lockfilename, os.R_OK): pid = int(open(lockfilename).read()) if not process_exists(pid): if run_mode: logger.error("No process running at %d; use mini-dinstall -k to remove lockfile") sys.exit(1) logger.warn("No process running at %d, removing lockfile" % pid) os.unlink(lockfilename) if kill_mode: sys.exit(0) if not os.path.isabs(socket_name): socket_name = os.path.join(dinstall_subdir, socket_name) if run_mode or kill_mode: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) logger.debug('Connecting...') sock.connect(socket_name) if run_mode: logger.debug('Sending RUN command') sock.send('RUN\n'.encode('utf-8')) else: logger.debug('Sending DIE command') sock.send('DIE\n'.encode('utf-8')) logger.debug('Reading response') response = sock.recv(8192).decode('utf-8') print(response) sys.exit(0) if configp.has_option('DEFAULT', 'logfile'): logfile_name = configp.get('DEFAULT', 'logfile') if not no_log: if not os.path.isabs(logfile_name): logfile_name = os.path.join(dinstall_subdir, logfile_name) logger.debug("Adding log file: %s" % logfile_name) filehandler = logging.FileHandler(logfile_name) if loglevel == logging.WARN: filehandler.setLevel(logging.INFO) else: filehandler.setLevel(logging.DEBUG) logger.addHandler(filehandler) filehandler.setFormatter(logging.Formatter(fmt="%(asctime)s %(name)s [%(thread)d] %(levelname)s: %(message)s", datefmt="%b %d %H:%M:%S")) logger.info('Booting mini-dinstall %s' % pkg_version) class DinstallException(Exception): def __init__(self, value): self._value = value def __str__(self): return repr(self._value) if not configp.has_option('DEFAULT', 'archive_style'): logger.critical("You must set the default archive_style option (since version 0.4.0)") logging.shutdown() sys.exit(1) default_verify_sigs = os.access('/usr/share/keyrings/debian-keyring.gpg', os.R_OK) default_keyrings = [] default_extra_keyrings = [] if configp.has_option('DEFAULT', 'architectures'): default_architectures = re.split(', *', configp.get('DEFAULT', 'architectures')) if configp.has_option('DEFAULT', 'verify_sigs'): default_verify_sigs = configp.getboolean('DEFAULT', 'verify_sigs') if configp.has_option('DEFAULT', 'trigger_reindex'): default_trigger_reindex = configp.getboolean('DEFAULT', 'trigger_reindex') if configp.has_option('DEFAULT', 'poll_time'): default_poll_time = configp.getint('DEFAULT', 'poll_time') if configp.has_option('DEFAULT', 'max_retry_time'): default_max_retry_time = configp.getint('DEFAULT', 'max_retry_time') if configp.has_option('DEFAULT', 'expire_release_files'): expire_release_files = configp.getboolean('DEFAULT', 'expire_release_files') if configp.has_option('DEFAULT', 'keyids'): keyids = re.split(', *', configp.get('DEFAULT', 'keyids')) if configp.has_option('DEFAULT', 'keyrings'): default_keyrings = re.split(', *', configp.get('DEFAULT', 'keyrings')) if configp.has_option('DEFAULT', 'extra_keyrings'): default_extra_keyrings = re.split(', *', configp.get('DEFAULT', 'extra_keyrings')) if configp.has_option('DEFAULT', 'use_byhash'): use_byhash = configp.getboolean('DEFAULT', 'use_byhash') if configp.has_option('DEFAULT', 'use_dnotify'): use_dnotify = configp.getboolean('DEFAULT', 'use_dnotify') if configp.has_option('DEFAULT', 'mail_subject_template'): mail_subject_template = configp.get('DEFAULT', 'mail_subject_template') if configp.has_option('DEFAULT', 'mail_body_template'): mail_body_template = configp.get('DEFAULT', 'mail_body_template') if configp.has_option('DEFAULT', 'tweet_template'): tweet_template = configp.get('DEFAULT', 'tweet_template') if configp.has_option('DEFAULT', 'tweet_server'): tweet_server = configp.get('DEFAULT', 'tweet_server') if configp.has_option('DEFAULT', 'tweet_user'): tweet_user = configp.get('DEFAULT', 'tweet_user') if configp.has_option('DEFAULT', 'tweet_password'): tweet_password = configp.get('DEFAULT', 'tweet_password') sects = configp.sections() if sects: for sect in sects: distributions[sect] = {} if configp.has_option(sect, "architectures"): distributions[sect]["arches"] = re.split(', *', configp.get(sect, "architectures")) else: distributions[sect]["arches"] = default_architectures else: for dist in default_distributions: distributions[dist] = {"arches": default_architectures} class DistOptionHandler: def __init__(self, distributions, configp): self._configp = configp self._distributions = distributions user = getpass.getuser() self._optionmap = { 'alias': ['str', None], 'poll_time': ['int', default_poll_time], 'max_retry_time': ['int', default_max_retry_time], 'post_install_script': ['str', None], 'pre_install_script': ['str', None], 'dynamic_reindex': ['bool', True], 'restrict_changes_files': ['bool', None], 'chown_changes_files': ['bool', None], 'keep_old': ['bool', False], 'mail_on_success': ['bool', True], 'tweet_on_success': ['bool', False], 'archive_style': ['str', None], # Release file stuff 'generate_release': ['bool', False], 'release_origin': ['str', user], 'release_label': ['str', user], 'release_suite': ['str', None], 'release_codename': ['str', None], 'experimental_release': ['bool', False], 'backport_release': ['bool', False], 'release_description': ['str', None], 'release_signscript': ['str', None], 'keyids': ['list', None], 'keyrings': ['list', None], 'extra_keyrings': ['list', None], 'expire_release_files': ['bool', False], 'verify_sigs': ['bool', False], 'use_byhash': ['bool', True] } def get_option_map(self, dist): ret = self._distributions[dist] for (key, value) in list(self._optionmap.items()): if self._configp.has_option(dist, key): ret[key] = self.get_option(value[0], dist, key) elif self._configp.has_option('DEFAULT', key): ret[key] = self.get_option(value[0], 'DEFAULT', key) else: ret[key] = value[1] return ret def get_option(self, type, dist, key): if type == 'int': return self._configp.getint(dist, key) elif type == 'str': return self._configp.get(dist, key) elif type == 'list': return re.split(', *', self._configp.get(dist, key)) elif type == 'bool': return self._configp.getboolean(dist, key) assert None distoptionhandler = DistOptionHandler(distributions, configp) for dist in list(distributions.keys()): distributions[dist] = distoptionhandler.get_option_map(dist) if distributions[dist]['archive_style'] not in ('simple-subdir', 'flat'): raise DinstallException('Unknown archive style "%s"' % distributions[dist]['archive_style']) if distributions[dist]['chown_changes_files'] is not None: logger.warn("'chown_changes_files' is deprecated, please use 'restrict_changes_files' instead") if distributions[dist]['restrict_changes_files'] is None: distributions[dist]['restrict_changes_files'] = distributions[dist]['chown_changes_files'] if distributions[dist]['restrict_changes_files'] is None: distributions[dist]['restrict_changes_files'] = True logger.debug("Distributions: %s" % distributions) os.chdir(toplevel_directory) do_mkdir(dinstall_subdir) rejectdir = os.path.join(dinstall_subdir, 'REJECT') incoming_subdir = os.path.join(dinstall_subdir, incoming_subdir) do_mkdir(rejectdir) do_mkdir(incoming_subdir) do_chmod(incoming_subdir, incoming_permissions) ## IPC stuff # Used by all threads to determine whether or not they should exit die_event = threading.Event() # These global variables are used in IncomingDir::daemonize # I couldn't figure out any way to pass state to a BaseRequestHandler. reprocess_needed = threading.Event() reprocess_finished = threading.Event() reprocess_lock = threading.Lock() class IncomingDirRequestHandler(socketserver.StreamRequestHandler, socketserver.BaseRequestHandler): def handle(self): logger.debug('Got request from %s' % self.client_address) req = self.rfile.readline().strip().decode('utf-8') if req == 'RUN': logger.debug('Doing RUN command') reprocess_lock.acquire() reprocess_needed.set() logger.debug('Waiting on reprocessing') reprocess_finished.wait() reprocess_finished.clear() reprocess_lock.release() self.wfile.write('200 Reprocessing complete'.encode('utf-8')) elif req == 'DIE': logger.debug('Doing DIE command') self.wfile.write('200 Beginning shutdown'.encode('utf-8')) die_event.set() else: logger.debug('Got unknown command %s' % req) self.wfile.write('500 Unknown request'.encode('utf-8')) class ExceptionThrowingThreadedUnixStreamServer(socketserver.ThreadingUnixStreamServer): def handle_error(self, request, client_address): self._logger.exception("Unhandled exception during request processing; shutting down") die_event.set() class IncomingDir(threading.Thread): def __init__(self, dir, archivemap, logger, trigger_reindex=True, poll_time=30, max_retry_time=172800, batch_mode=False): threading.Thread.__init__(self, name="incoming") self._dir = dir self._archivemap = archivemap self._logger = logger self._trigger_reindex = trigger_reindex self._poll_time = poll_time self._batch_mode = batch_mode self._max_retry_time = max_retry_time self._last_failed_targets = {} self._eventqueue = queue.Queue() self._done_event = threading.Event() # ensure we always have some reprocess queue self._reprocess_queue = {} def run(self): self._logger.info('Created new installer thread (%s)' % self.getName()) self._logger.info('Entering batch mode...') initial_reprocess_queue = [] initial_fucked_list = [] try: for (changefilename, changefile) in self._get_changefiles(): if self._changefile_ready(changefilename, changefile): try: self._install_changefile(changefilename, changefile, 0) except Exception: logger.exception('Unable to install "%s"; adding to screwed list' % changefilename) initial_fucked_list.append(changefilename) else: self._logger.warn('Skipping "%s"; upload incomplete' % changefilename) initial_reprocess_queue.append(changefilename) if not self._batch_mode: self._daemonize(initial_reprocess_queue, initial_fucked_list) self._done_event.set() self._logger.info('All packages in incoming dir installed; exiting') except Exception as e: self._logger.exception("Unhandled exception; shutting down") die_event.set() self._done_event.set() return False def _abspath(self, *args): return os.path.abspath(os.path.join(*[self._dir] + list(args))) def _get_changefiles(self): ret = [] globpath = self._abspath("*.changes") self._logger.debug("glob: %s" % globpath) changefilenames = glob.glob(globpath) for changefilename in changefilenames: if changefilename not in self._reprocess_queue: self._logger.info('Examining "%s"' % changefilename) changefile = ChangeFile() try: changefile.load_from_file(changefilename) except ChangeFileException: self._logger.debug('Unable to parse "%s", skipping' % changefilename) continue ret.append((changefilename, changefile)) else: self._logger.debug('Skipping "%s" during new scan because it is in the reprocess queue.' % changefilename) return ret def _changefile_ready(self, changefilename, changefile): try: dist = changefile['distribution'] except KeyError as e: self._logger.warn('Unable to read distribution field for "%s"; data: %s' % (changefilename, changefile)) return False try: changefile.verify(self._abspath('')) except ChangeFileException: return False return True def _install_changefile(self, changefilename, changefile, doing_reprocess): changefiledist = changefile['distribution'] for dist in list(distributions.keys()): distributions[dist] = distoptionhandler.get_option_map(dist) if distributions[dist]['alias'] and changefiledist in distributions[dist]['alias']: logger.info('Distribution "%s" is an alias for "%s"' % (changefiledist, dist)) break else: dist = changefiledist if dist not in list(self._archivemap.keys()): raise DinstallException('Unknown distribution "%s" in "%s"' % (dist, changefilename)) logger.debug('Installing %s in archive %s' % (changefilename, self._archivemap[dist][1].getName())) self._archivemap[dist][0].install(changefilename, changefile) if self._trigger_reindex: if doing_reprocess: logger.debug('Waiting on archive %s to reprocess' % self._archivemap[dist][1].getName()) self._archivemap[dist][1].wait_reprocess() else: logger.debug('Notifying archive %s of change' % self._archivemap[dist][1].getName()) self._archivemap[dist][1].notify() logger.debug('Finished processing %s' % changefilename) def _reject_changefile(self, changefilename, changefile, e): dist = changefile['distribution'] if dist not in self._archivemap: raise DinstallException('Unknown distribution "%s" in "%s"' % (dist, changefilename)) self._archivemap[dist][0].reject(changefilename, changefile, e) def _daemon_server_isready(self): (inready, outready, exready) = select.select([self._server.fileno()], [], [], 0) return bool(inready) def _daemon_event_ispending(self): return die_event.isSet() or reprocess_needed.isSet() or self._daemon_server_isready() or not self._eventqueue.empty() def _daemon_reprocess_pending(self): curtime = time.time() for (starttime, nexttime, delay) in list(self._reprocess_queue.values()): if curtime >= nexttime: return True return False def _daemonize(self, init_reprocess_queue, init_fucked_list): self._logger.info('Entering daemon mode...') self._dnotify = DirectoryNotifierFactory().create([self._dir], use_dnotify=use_dnotify, poll_time=self._poll_time, cancel_event=die_event) self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name="Incoming watcher") self._async_dnotify.start() try: os.unlink(socket_name) except OSError as e: pass self._server = ExceptionThrowingThreadedUnixStreamServer(socket_name, IncomingDirRequestHandler) self._server.allow_reuse_address = True retry_time = 30 self._reprocess_queue = {} fucked = init_fucked_list doing_reprocess = False # Initialize the reprocessing queue for changefilename in init_reprocess_queue: curtime = time.time() self._reprocess_queue[changefilename] = [curtime, curtime, retry_time] # The main daemon loop while True: # Wait until we have something to do while not (self._daemon_event_ispending() or self._daemon_reprocess_pending()): time.sleep(0.5) self._logger.debug('Checking for pending server requests') if self._daemon_server_isready(): self._logger.debug('Handling one request') self._server.handle_request() self._logger.debug('Checking for DIE event') if die_event.isSet(): self._logger.debug('DIE event caught') break self._logger.debug('Scanning for changes') # do we have anything to reprocess? for (changefilename, (starttime, nexttime, delay)) in list(self._reprocess_queue.items()): curtime = time.time() try: changefile = ChangeFile() changefile.load_from_file(changefilename) except (ChangeFileException, IOError) as e: if not os.path.exists(changefilename): self._logger.info('Changes file "%s" got removed' % changefilename) else: self._logger.exception('Unable to load Changes file "%s"' % changefilename) self._logger.warn('Marking "%s" as screwed' % changefilename) fucked.append(changefilename) del self._reprocess_queue[changefilename] continue if curtime - starttime > self._max_retry_time: # We've tried too many times; reject it. self._reject_changefile(changefilename, changefile, DinstallException("Couldn't install \"%s\" in %d seconds" % (changefilename, self._max_retry_time))) elif curtime >= nexttime: if self._changefile_ready(changefilename, changefile): # Let's do it! self._logger.debug('Preparing to install "%s"' % changefilename) try: self._install_changefile(changefilename, changefile, doing_reprocess) self._logger.debug('Removing "%s" from incoming queue after successful install.' % changefilename) del self._reprocess_queue[changefilename] except Exception as e: logger.exception('Unable to install "%s"; adding to screwed list' % changefilename) fucked.append(changefilename) else: delay *= 2 if delay > 60 * 60: delay = 60 * 60 self._logger.info("Upload \"%s\" isn't complete; marking for retry in %d seconds" % (changefilename, delay)) self._reprocess_queue[changefilename][1:3] = [time.time() + delay, delay] # done reprocessing; now scan for changed dirs. relname = None self._logger.debug('Checking dnotify event queue') if not self._eventqueue.empty(): relname = os.path.basename(os.path.abspath(self._eventqueue.get())) self._logger.debug('Got %s from dnotify' % relname) if not (relname or doing_reprocess): if reprocess_needed.isSet(): self._logger.info('Got reprocessing event') reprocess_needed.clear() doing_reprocess = True else: self._logger.debug('No events to process') continue for (changefilename, changefile) in self._get_changefiles(): if changefilename in fucked: self._logger.warn('Skipping screwed Changes file "%s"' % changefilename) continue # Have we tried this changefile before? if changefilename not in self._reprocess_queue: self._logger.debug('New Changes file "%s"' % changefilename) if self._changefile_ready(changefilename, changefile): try: self._install_changefile(changefilename, changefile, doing_reprocess) except Exception as e: logger.exception('Unable to install "%s"; adding to screwed list' % changefilename) fucked.append(changefilename) else: curtime = time.time() self._logger.info("Upload \"%s\" isn't complete; marking for retry in %d seconds" % (changefilename, retry_time)) self._reprocess_queue[changefilename] = [curtime, curtime + retry_time, retry_time] if doing_reprocess: doing_reprocess = False self._logger.info('Reprocessing complete') reprocess_finished.set() def wait(self): self._done_event.wait() def parse_versions(fullversion): debianversion = re.sub('^[0-9]+:', '', fullversion) upstreamver = re.sub('-[^-]*$', '', debianversion) return (upstreamver, debianversion) class ArchiveDir: def __init__(self, dir, logger, configdict, batch_mode=False, keyrings=None, extra_keyrings=None, verify_sigs=False): self._dir = dir self._name = os.path.basename(os.path.abspath(dir)) self._logger = logger for (key, value) in list(configdict.items()): self._logger.debug('Setting "%s" => "%s" in archive "%s"' % ('_' + key, value, self._name)) self.__dict__['_' + key] = value do_mkdir(dir) self._batch_mode = batch_mode if 'verify_sigs' in configdict: self._verify_sigs = configdict['verify_sigs'] else: self._verify_sigs = verify_sigs if configdict['keyrings']: self._keyrings = configdict['keyrings'] else: self._keyrings = keyrings if configdict['extra_keyrings']: self._extra_keyrings = configdict['extra_keyrings'] else: self._extra_keyrings = extra_keyrings if self._mail_on_success: self._success_logger = logging.Logger("mini-dinstall." + self._name) self._success_logger.setLevel(logging.DEBUG) self.mailHandler = SubjectSpecifyingLoggingSMTPHandler(mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(), socket.getfqdn()), [mail_to]) self.mailHandler.setLevel(logging.DEBUG) self._success_logger.addHandler(self.mailHandler) self._clean_targets = [] def _abspath(self, *args): return os.path.abspath(os.path.join(*[self._dir] + list(args))) def _relpath(self, *args): return os.path.join(*[self._name] + list(args)) def install(self, changefilename, changefile): retval = False try: retval = self._install_run_scripts(changefilename, changefile) except Exception: self._logger.exception("Unhandled exception during installation") if not retval: self._logger.info('Failed to install "%s"' % changefilename) def reject(self, changefilename, changefile, reason): self._reject_changefile(changefilename, changefile, reason) def _install_run_scripts(self, changefilename, changefile): self._logger.info('Preparing to install "%s" in archive %s' % (changefilename, self._name)) sourcename = changefile['source'] version = changefile['version'] if self._verify_sigs: self._logger.info('Verifying signature on "%s"' % changefilename) try: if self._keyrings: verifier = DebianSigVerifier(keyrings=list(map(os.path.expanduser, self._keyrings)), extra_keyrings=self._extra_keyrings) else: verifier = DebianSigVerifier(extra_keyrings=self._extra_keyrings) output = verifier.verify(changefilename) logger.debug(output) logger.info('Good signature on "%s"' % changefilename) except GPGSigVerificationFailure as e: msg = 'Failed to verify signature on "%s": %s\n' % (changefilename, e) msg += ''.join(e.getOutput()) logger.error(msg) self._reject_changefile(changefilename, changefile, e) return False else: self._logger.debug('Skipping signature verification on "%s"' % changefilename) if self._pre_install_script: try: self._logger.debug("Running pre-installation script: %s" % self._pre_install_script) if self._run_script(os.path.abspath(changefilename), self._pre_install_script): return False except: self._logger.exception("failure while running pre-installation script") return False try: self._install_changefile_internal(changefilename, changefile) except Exception as e: self._logger.exception('Failed to process "%s"' % changefilename) self._reject_changefile(changefilename, changefile, e) return False if self._restrict_changes_files: do_chmod(changefilename, 0o600) target = os.path.join(self._dir, os.path.basename(changefilename)) # the final step do_rename(changefilename, target) self._logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name)) if self._mail_on_success: done = False missing_fields = [] if 'changes' in changefile: changefile['changes_without_dot'] = misc.format_changes(changefile['changes']) while not done: try: mail_subject = mail_subject_template % changefile mail_body = mail_body_template % changefile except KeyError as exc: key = exc.args[0] changefile[key] = '' missing_fields.append(key) else: done = True if missing_fields: mail_body = mail_body + "\n\nMissing Changes file fields: %s" % missing_fields mail.send(mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(), socket.getfqdn()), mail_to, mail_body, mail_subject) if self._tweet_on_success: done = False missing_fields = [] if 'changes' in changefile: changefile['changes_without_dot'] = misc.format_changes(changefile['changes']) while not done: try: tweet_body = tweet_template % changefile except KeyError as exc: key = exc.args[0] changefile[key] = '' missing_fields.append(key) else: done = True if missing_fields: tweet_body = tweet_body + "\n\n(errs: %s)" % missing_fields tweet.send(tweet_body, tweet_server, tweet_user, tweet_password) if self._post_install_script: try: self._logger.debug("Running post-installation script: %s" % self._post_install_script) self._run_script(target, self._post_install_script) except: self._logger.exception("failure while running post-installation script") return False return True def _install_changefile_internal(self, changefilename, changefile): sourcename = changefile['source'] version = changefile['version'] incomingdir = os.path.dirname(changefilename) newfiles = [] is_native = not native_version_re.match(version) if is_native: (ignored, newdebianver) = parse_versions(version) else: (newupstreamver, newdebianver) = parse_versions(version) is_sourceful = False for file in [x[2] for x in changefile.getFiles()]: match = debpackage_re.search(file) if match: arch = match.group(3) if arch not in self._arches: raise DinstallException("Unknown architecture: %s" % arch) target = self._arch_target(arch, file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch)) continue match = debbuildinfo_re.search(file) if match: arch = match.group(3) if arch not in self._arches: raise DinstallException("Unknown architecture: %s" % arch) target = self._arch_target(arch, file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch)) continue match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file) \ or debsrc_orig_re.search(file) or debsrc_native_re.search(file) if match: is_sourceful = True target = self._source_target(file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) all_arches = [] for arch in [x[3] for x in newfiles]: if arch not in all_arches: all_arches.append(arch) completed = [] oldfiles = [] if not self._keep_old: found_old_bins = False for (oldversion, oldarch) in [x[1:] for x in self._get_package_versions()]: if oldarch not in all_arches and apt_pkg.version_compare(oldversion, version) < 0: found_old_bins = True for (pkgname, arch) in [x[2:] for x in newfiles]: if arch == 'source' and found_old_bins: continue self._logger.debug('Scanning for old files') for file in self._read_arch_dir(arch): match = debpackage_re.search(file) or debbuildinfo_re.search(file) if not match: continue oldpkgname = match.group(1) oldarch = match.group(3) file = self._arch_target(arch, file) if file not in [x[0] for x in oldfiles]: target = file + tmp_old_suffix if oldpkgname == pkgname and oldarch == arch: oldfiles.append((file, target)) self._logger.debug('Scanning "%s" for old files' % self._abspath('source')) for file in self._read_source_dir(): file = self._source_target(file) if file not in [x[0] for x in oldfiles]: target = file + tmp_old_suffix match = debchanges_re.search(file) if not match and is_sourceful: match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file) if match and match.group(1) == sourcename: oldfiles.append((file, target)) continue # We skip the rest of this if it wasn't a # sourceful upload; really all we do if it isn't # is clean out old .changes files. if not is_sourceful: continue match = debsrc_orig_re.search(file) if match and match.group(1) == sourcename: if not is_native: (oldupstreamver, olddebianver) = parse_versions(match.group(2)) if apt_pkg.version_compare(oldupstreamver, newupstreamver) < 0: self._logger.debug('old upstream tarball "%s" version %s < %s, tagging for deletion' % (file, oldupstreamver, newupstreamver)) oldfiles.append((file, target)) continue else: self._logger.debug('keeping upstream tarball "%s" version %s' % (file, oldupstreamver)) continue else: self._logger.debug('old native tarball "%s", tagging for deletion' % file) oldfiles.append((file, target)) continue match = debsrc_native_re.search(file) if match and match.group(1) in [x[2] for x in newfiles]: oldfiles.append((file, target)) continue self._clean_targets = [x[1] for x in oldfiles] allrenames = oldfiles + [x[:2] for x in newfiles] try: for (oldname, newname) in allrenames: do_rename(oldname, newname) completed.append((oldname, newname)) except OSError as e: logger.exception("Failed to do rename (%s); attempting rollback" % e.strerror) try: self._logger.error(traceback.format_tb(sys.exc_info()[2])) except: pass # Unwind to previous state for (newname, oldname) in completed: do_rename(oldname, newname) raise self._clean_targets = [] # remove old files self.clean() def _run_script(self, changefilename, script): if script: script = os.path.expanduser(script) cmd = '%s %s' % (script, changefilename) self._logger.info('Running "%s"' % cmd) if not no_act: if not os.access(script, os.X_OK): self._logger.error("Can't execute script \"%s\"" % script) return True try: subprocess.check_call([script, changefilename]) except subprocess.CalledProcessError as e: self._logger.error('script "%s" exited with error code %d' % (cmd, e.returncode)) return True return False def _reject_changefile(self, changefilename, changefile, exception): sourcename = changefile['source'] version = changefile['version'] incomingdir = os.path.dirname(changefilename) try: f = open(os.path.join(rejectdir, "%s_%s.reason" % (sourcename, version)), 'w') if isinstance(exception, str): f.write(exception) else: traceback.print_exception(Exception, exception, None, None, f) f.close() for file in [x[2] for x in changefile.getFiles()]: if os.access(os.path.join(incomingdir, file), os.R_OK): file = os.path.join(incomingdir, file) else: file = self._abspath(file) target = os.path.join(rejectdir, os.path.basename(file)) do_rename(file, target) do_rename(changefilename, os.path.join(rejectdir, os.path.basename(changefilename))) self._logger.info('Rejecting "%s": %s' % (changefilename, repr(exception))) except Exception: self._logger.error("Unhandled exception while rejecting %s; archive may be in inconsistent state" % changefilename) raise def clean(self): self._logger.debug('Removing old files') for file in self._clean_targets: self._logger.debug('Deleting "%s"' % file) if not no_act: os.unlink(file) class SimpleSubdirArchiveDir(ArchiveDir): def __init__(self, *args, **kwargs): ArchiveDir.__init__(*[self] + list(args), **kwargs) for arch in list(self._arches) + ['source']: target = os.path.join(self._dir, arch) do_mkdir(target) def _read_source_dir(self): return os.listdir(self._abspath('source')) def _read_arch_dir(self, arch): return os.listdir(self._abspath(arch)) def _arch_target(self, arch, file): return self._abspath(arch, file) def _source_target(self, file): return self._arch_target('source', file) def _get_package_versions(self): ret = [] for arch in self._arches: for file in self._read_arch_dir(arch): match = debpackage_re.search(file) if match: ret.append((match.group(1), match.group(2), match.group(3))) return ret class FlatArchiveDir(ArchiveDir): def _read_source_dir(self): return os.listdir(self._dir) def _read_arch_dir(self, arch): return os.listdir(self._dir) def _arch_target(self, arch, file): return self._abspath(file) def _source_target(self, file): return self._arch_target('source', file) def _get_package_versions(self): ret = [] for file in self._abspath(''): match = debpackage_re.search(file) if match: ret.append((match.group(1), match.group(2), match.group(3))) return ret class ArchiveDirIndexer(threading.Thread): def __init__(self, dir, logger, configdict, use_dnotify=False, batch_mode=True): self._dir = dir self._name = os.path.basename(os.path.abspath(dir)) threading.Thread.__init__(self, name=self._name) self._logger = logger self._eventqueue = queue.Queue() for (key, value) in list(configdict.items()): self._logger.debug('Setting "%s" => "%s" in archive "%s"' % ('_' + key, value, self._name)) self.__dict__['_' + key] = value do_mkdir(dir) self._use_dnotify = use_dnotify self._batch_mode = batch_mode self._done_event = threading.Event() def _abspath(self, *args): return os.path.abspath(os.path.join(*[self._dir] + list(args))) def _relpath(self, *args): return os.path.join(*[self._name] + list(args)) def _make_indexfile(self, dir, type, name, arch=None): cmdline = ['apt-ftparchive', type, dir, '-o', 'APT::FTPArchive::AlwaysStat=true', '-o', 'APT::FTPArchive::SHA1=false', '-o', 'APT::FTPArchive::MD5=false'] if arch: cmdline.extend(['--arch', arch]) if not nodb_mode: cmdline.extend(['--db', '%s.db' % dir]) self._logger.debug("Running: %s" % ' '.join(cmdline)) if no_act: return if arch: packagesfilename = os.path.join(dir, '%s-%s' % (name, arch)) else: packagesfilename = os.path.join(dir, name) newpackagesfilename = packagesfilename + '.new' xzpackagesfilename = packagesfilename + '.xz' newxzpackagesfilename = newpackagesfilename + '.xz' newpackagesfile = open(newpackagesfilename, 'wb') newxzpackagesfile = lzma.open(newxzpackagesfilename, 'w') process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.join(self._dir, '..')) buf = process.stdout.read(8192) while buf: newpackagesfile.write(buf) newxzpackagesfile.write(buf) buf = process.stdout.read(8192) process.wait() if process.returncode != 0: raise DinstallException("apt-ftparchive exited with status code %d" % process.returncode) newpackagesfile.close() newxzpackagesfile.close() shutil.move(newpackagesfilename, packagesfilename) shutil.move(newxzpackagesfilename, xzpackagesfilename) if self._use_byhash: for hash in hashes: do_mkdir(os.path.join(dir, 'by-hash')) hashdir = os.path.join(dir, 'by-hash', hash.upper()) do_mkdir(hashdir) xzbyhash = self._get_file_sum(hash, xzpackagesfilename) shutil.copy(xzpackagesfilename, os.path.join(hashdir, xzbyhash)) mtime = lambda f: os.stat(os.path.join(hashdir, f)).st_mtime for oldbyhash in sorted(os.listdir(hashdir), key=mtime)[:-16]: self._logger.debug("Removing old by-hash file: %s" % oldbyhash) os.remove(os.path.join(hashdir, oldbyhash)) def _make_packagesfile(self, dir): self._make_indexfile(dir, 'packages', 'Packages') def _make_sourcesfile(self, dir): self._make_indexfile(dir, 'sources', 'Sources') def _make_contentsfile(self, dir, arch): self._make_indexfile(dir, 'contents', 'Contents', arch) def _sign_releasefile(self, name, dir): if self._release_signscript: try: self._logger.debug("Running release signing script: %s" % self._release_signscript) if self._run_script(name, self._release_signscript, dir=dir): return False except: self._logger.exception("failure while running Release signature script") return False return True # Copied from ArchiveDir def _run_script(self, changefilename, script, dir=None): if script: script = os.path.expanduser(script) cmd = '%s %s' % (script, changefilename) self._logger.info('Running "%s"' % cmd) if not no_act: if not os.access(script, os.X_OK): self._logger.error("Can't execute script \"%s\"" % script) return True try: subprocess.check_call([script, changefilename], cwd=dir) except subprocess.CalledProcessError as e: self._logger.error('script "%s" exited with error code %d' % (cmd, e.returncode)) return True return False def _get_file_sum(self, type, filename): ret = misc.get_file_sum(self, type, filename) if ret: return ret else: raise DinstallException('cannot compute hash of type %s; no builtin method or /usr/bin/%ssum', type, type) def _do_hash(self, hash, indexfiles, f): """ write hash digest into filehandle @param hash: used hash algorithm @param indexfiles: system architectures @param f: file handle """ f.write("%s%s:\n" % (hash.upper(), ['', 'Sum'][hash == 'md5'])) for file in indexfiles: absfile = self._abspath(file) h = self._get_file_sum(hash, absfile) size = os.stat(absfile)[stat.ST_SIZE] f.write(' %s% 16d %s\n' % (h, size, os.path.basename(absfile))) def _index_all(self, force=False): self._index(self._arches + ['source'], force) def _gen_release_all(self, force=False): self._gen_release(self._arches, force) def run(self): self._logger.info('Created new thread (%s) for archive indexer %s' % (self.getName(), self._name)) self._logger.info('Entering batch mode...') try: self._index_all(1) self._gen_release_all(True) if not self._batch_mode: # never returns self._daemonize() self._done_event.set() except Exception as e: self._logger.exception("Unhandled exception; shutting down") die_event.set() self._done_event.set() self._logger.info('Thread "%s" exiting' % self.getName()) def _daemon_event_ispending(self): return die_event.isSet() or not self._eventqueue.empty() def _daemonize(self): self._logger.info('Entering daemon mode...') if self._dynamic_reindex: self._dnotify = DirectoryNotifierFactory().create(self._get_dnotify_dirs(), use_dnotify=self._use_dnotify, poll_time=self._poll_time, cancel_event=die_event) self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name=self._name + " Indexer") self._async_dnotify.start() # The main daemon loop while True: # Wait until we have a pending event while not self._daemon_event_ispending(): time.sleep(1) if die_event.isSet(): break self._logger.debug('Reading from event queue') setevent = None dir = None obj = self._eventqueue.get() if isinstance(obj, str): self._logger.debug('got dir change') dir = obj elif not obj: self._logger.debug('got general event') setevent = None elif obj.__class__ == threading.Event().__class__: self._logger.debug('got wait_reprocess event') setevent = obj else: self._logger.error("unknown object %s in event queue" % obj) assert None # This is to protect against both lots of activity, and to # prevent race conditions, so we can rely on timestamps. time.sleep(1) if not self._reindex_needed(): if setevent: self._logger.debug('setting wait_reprocess event') setevent.set() continue if not dir: self._logger.debug('Got general change') self._index_all(1) self._gen_release_all(True) else: self._logger.debug('Got change in %s' % dir) self._index([os.path.basename(os.path.abspath(dir))]) self._gen_release([os.path.basename(os.path.abspath(dir))]) if setevent: self._logger.debug('setting wait_reprocess event') setevent.set() def _reindex_needed(self): if os.access(self._abspath('Release.gpg'), os.R_OK): gpg_mtime = os.stat(self._abspath('Release.gpg'))[stat.ST_MTIME] for dir in self._get_dnotify_dirs(): if os.stat(self._abspath(dir))[stat.ST_MTIME] > gpg_mtime: return True else: return True return False def _index(self, arches, force=False): self._index_impl(arches, force) def _gen_release(self, arches, force=False): self._gen_release_impl(self._arches, force) def wait_reprocess(self): e = threading.Event() self._eventqueue.put(e) self._logger.debug('waiting on reprocess') while not (e.isSet() or die_event.isSet()): time.sleep(0.5) self._logger.debug('done waiting on reprocess') def wait(self): self._done_event.wait() def notify(self): self._eventqueue.put(None) class SimpleSubdirArchiveDirIndexer(ArchiveDirIndexer): def __init__(self, *args, **kwargs): ArchiveDirIndexer.__init__(*[self] + list(args), **kwargs) for arch in list(self._arches) + ['source']: target = os.path.join(self._dir, arch) do_mkdir(target) def _index_impl(self, arches, force=False): for arch in arches: dirmtime = os.stat(self._relpath(arch))[stat.ST_MTIME] if arch != 'source': pkgsfile = self._relpath(arch, 'Packages') if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Packages file for %s...' % arch) self._make_packagesfile(self._relpath(arch)) self._logger.info('Packages generation complete') else: self._logger.info('Skipping generation of Packages file for %s' % arch) else: pkgsfile = self._relpath(arch, 'Sources') if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Sources file for %s...' % arch) self._make_sourcesfile(self._relpath('source')) self._logger.info('Sources generation complete') else: self._logger.info('Skipping generation of Sources file for %s' % arch) def _gen_release_impl(self, arches, force=False): for arch in arches: targetname = self._relpath(arch, 'Release') if not self._generate_release: if os.access(targetname, os.R_OK): self._logger.info("Release generation disabled, removing existing Release file") try: os.unlink(targetname) except OSError as e: pass return tmpname = targetname + tmp_new_suffix uncompr_indexfile = os.path.join(arch, 'Packages') indexfiles = [uncompr_indexfile] comprexts = ['.xz'] for ext in comprexts: indexfiles.append(uncompr_indexfile + ext) if not self._release_needed(targetname, indexfiles): self._logger.info("Skipping Release generation") continue self._logger.info("Generating Release...") if no_act: self._logger.info("Release generation complete") return f = open(tmpname, 'w') f.write('Origin: %s\n' % self._release_origin) f.write('Label: %s\n' % self._release_label) suite = self._release_suite if not suite: suite = self._name f.write('Suite: %s\n' % suite) codename = self._release_codename if not codename: codename = suite f.write('Codename: %s/%s\n' % (codename, arch)) if self._experimental_release: f.write('NotAutomatic: yes\n') elif self._backport_release: f.write('NotAutomatic: yes\n') f.write('ButAutomaticUpgrades: yes\n') f.write('Date: %s\n' % time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime())) if self._expire_release_files or self._keyids: f.write('Valid-Until: %s\n' % (datetime.datetime.utcnow() + datetime.timedelta(days=28)).strftime("%a, %d %b %Y %H:%M:%S UTC")) f.write('Architectures: %s\n' % arch) if self._keyids: f.write('Signed-By: %s\n' % ','.join(self._keyids)) if self._use_byhash: f.write('Acquire-By-Hash: yes\n') if self._release_description: f.write('Description: %s\n' % self._release_description) for hash in hashes: self._do_hash(hash, indexfiles, f) f.close() if self._sign_releasefile(os.path.basename(tmpname), self._abspath(arch)): os.rename(tmpname, targetname) self._logger.info("Release generation complete") def _in_archdir(self, *args): return (lambda x, self=self: self._abspath(x))(*args) def _get_dnotify_dirs(self): return list(map(lambda x, self=self: self._abspath(x), self._arches + ['source'])) def _get_all_indexfiles(self): return [os.path.join(arch, 'Packages') for arch in self._arches] + ['source/Sources'] def _release_needed(self, targetname, indexfiles): if os.access(targetname, os.R_OK): release_mtime = os.stat(targetname)[stat.ST_MTIME] for file in indexfiles: if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime: return True else: return True return False class FlatArchiveDirIndexer(ArchiveDirIndexer): def __init__(self, *args, **kwargs): ArchiveDirIndexer.__init__(*[self] + list(args), **kwargs) def _index_impl(self, arches, force=False): pkgsfile = self._abspath('Packages') dirmtime = os.stat(self._relpath())[stat.ST_MTIME] if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Packages file...') self._make_packagesfile(self._relpath()) self._logger.info('Packages generation complete') else: self._logger.info('Skipping generation of Packages file') pkgsfile = self._abspath('Sources') if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Sources file...') self._make_sourcesfile(self._relpath()) self._logger.info('Sources generation complete') else: self._logger.info('Skipping generation of Sources file') for arch in self._arches: if arch in ("all", "source"): continue pkgsfile = self._abspath('Contents-%s' % arch) if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Contents file...') self._make_contentsfile(self._relpath(), arch) self._logger.info('Contents generation complete') else: self._logger.info('Skipping generation of Contents file') def _gen_release_impl(self, arches, force=False): targetname = self._abspath('Release') if not self._generate_release: if os.access(targetname, os.R_OK): self._logger.info("Release generation disabled, removing existing Release file") try: os.unlink(targetname) except OSError as e: pass return tmpname = targetname + tmp_new_suffix uncompr_indexfiles = self._get_all_indexfiles() indexfiles = [] comprexts = ['.xz'] for index in uncompr_indexfiles: indexfiles.append(index) for ext in comprexts: indexfiles.append(index + ext) if not self._release_needed(targetname, indexfiles): self._logger.info("Skipping Release generation") return self._logger.info("Generating Release...") if no_act: self._logger.info("Release generation complete") return f = open(tmpname, 'w') f.write('Origin: %s\n' % self._release_origin) f.write('Label: %s\n' % self._release_label) suite = self._release_suite if not suite: suite = self._name f.write('Suite: %s\n' % suite) codename = self._release_codename if not codename: codename = suite f.write('Codename: %s\n' % codename) if self._experimental_release: f.write('NotAutomatic: yes\n') elif self._backport_release: f.write('NotAutomatic: yes\n') f.write('ButAutomaticUpgrades: yes\n') f.write('Date: %s\n' % time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime())) if self._expire_release_files or self._keyids: f.write('Valid-Until: %s\n' % (datetime.datetime.utcnow() + datetime.timedelta(days=28)).strftime("%a, %d %b %Y %H:%M:%S UTC")) f.write('Architectures: %s\n' % ' '.join(self._arches)) if self._keyids: f.write('Signed-By: %s\n' % ','.join(self._keyids)) if self._use_byhash: f.write('Acquire-By-Hash: yes\n') if self._release_description: f.write('Description: %s\n' % self._release_description) for hash in hashes: self._do_hash(hash, indexfiles, f) f.close() if self._sign_releasefile(tmpname, self._abspath()): os.rename(tmpname, targetname) self._logger.info("Release generation complete") def _in_archdir(self, *args): return (lambda x, self=self: self._abspath(x))(*args[1:]) def _get_dnotify_dirs(self): return [self._dir] def _get_all_indexfiles(self): allindexes = [] for arch in self._arches: if arch in ("all", "source"): continue allindexes.append("Contents-%s" % arch) return ['Packages', 'Sources'] + allindexes def _release_needed(self, targetname, indexfiles): if os.access(targetname, os.R_OK): release_mtime = os.stat(targetname)[stat.ST_MTIME] for file in indexfiles: if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime: return True else: return True return False if os.access(lockfilename, os.R_OK): logger.critical('lockfile "%s" exists (pid %s): is another mini-dinstall running?' % (lockfilename, open(lockfilename).read(10))) logging.shutdown() sys.exit(1) logger.debug('Creating lock file: %s' % lockfilename) if not no_act: lockfile = open(lockfilename, 'w') lockfile.close() if not (batch_mode or foreground_mode): # daemonize logger.debug("Daemonizing...") if os.fork() == 0: os.setsid() if os.fork() != 0: sys.exit(0) else: sys.exit(0) sys.stdin.close() sys.stdout.close() sys.stderr.close() os.close(0) os.close(1) os.close(2) # unix file descriptor allocation ensures that the following are fd 0,1,2 sys.stdin = open("/dev/null") sys.stdout = open("/dev/null") sys.stderr = open("/dev/null") logger.debug("Finished daemonizing (pid %d)" % os.getpid()) if foreground_mode: logger.debug("Running in foreground...") lockfile = open(lockfilename, 'w') lockfile.write(str(os.getpid())) lockfile.close() if not (debug_mode or batch_mode): # Don't log to stderr past this point logger.removeHandler(stderr_handler) archivemap = {} # Instantiate archive classes for installing files for (dist, value) in list(distributions.items()): if value['archive_style'] == 'simple-subdir': archdir = SimpleSubdirArchiveDir else: archdir = FlatArchiveDir archivemap[dist] = [archdir(dist, logger, value, batch_mode=batch_mode, keyrings=default_keyrings, extra_keyrings=default_extra_keyrings, verify_sigs=default_verify_sigs), None] # Create archive indexing threads, but don't start them yet for (dist, value) in list(distributions.items()): targetdir = os.path.join(toplevel_directory, dist) logger.info('Initializing archive indexer %s' % dist) if value['archive_style'] == 'simple-subdir': archdiridx = SimpleSubdirArchiveDirIndexer else: archdiridx = FlatArchiveDirIndexer archivemap[dist][1] = archdiridx(targetdir, logger, value, use_dnotify=use_dnotify, batch_mode=batch_mode) # Now: kick off the incoming processor logger.info('Initializing incoming processor') incoming = IncomingDir(incoming_subdir, archivemap, logger, trigger_reindex=trigger_reindex, poll_time=default_poll_time, max_retry_time=default_max_retry_time, batch_mode=batch_mode) logger.debug('Starting incoming processor') incoming.start() if batch_mode: logger.debug('Waiting for incoming processor to finish') incoming.wait() # Once we've installed everything, start the indexing threads for dist in list(distributions.keys()): archive = archivemap[dist][1] logger.debug('Starting archive %s' % archive.getName()) archive.start() # Wait for all the indexing threads to finish; none of these ever # return if we're in daemon mode if batch_mode: for dist in list(distributions.keys()): archive = archivemap[dist][1] logger.debug('Waiting for archive %s to finish' % archive.getName()) archive.wait() else: logger.debug("Waiting for die event") die_event.wait() logger.info('Die event caught; waiting for incoming processor to finish') incoming.wait() for dist in list(distributions.keys()): archive = archivemap[dist][1] logger.info('Die event caught; waiting for archive %s to finish' % archive.getName()) archive.wait() logger.debug('Removing lock file: %s' % lockfilename) os.unlink(lockfilename) logger.info("main thread exiting...") sys.exit(0) # vim:ts=4:sw=4:et: