#!/usr/bin/python3
# -*- mode: python; coding: utf-8 -*-
# Miniature version of "dinstall", for installing .changes
# into an archive
# Copyright (c) 2002, 2003 Colin Walters <walters@gnu.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os, sys, re, glob, getopt, time, traceback, lzma, getpass, socket
import shutil, threading, select, queue, socketserver, datetime, subprocess
import logging, logging.handlers
import apt_pkg
apt_pkg.init()
from configparser import *
from minidinstall.ChangeFile import *
from minidinstall.Dnotify import *
from minidinstall.DebianSigVerifier import *
from minidinstall.GPGSigVerifier import *
from minidinstall.version import *
from minidinstall import misc, mail, tweet
debchanges_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.changes$')
debpackage_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.(d|u)?deb$')
debbuildinfo_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.buildinfo$')
debsrc_dsc_re = re.compile('([-a-z0-9+.]+)_(.+?)\.dsc$')
debsrc_diff_re = re.compile('([-a-z0-9+.]+)_(.+?)\.diff\.gz$')
debsrc_orig_re = re.compile('([-a-z0-9+.]+)_(.+?)\.orig[-a-z0-9]*\.tar\.(gz|bz2|lzma|xz)(\.asc)?$')
debsrc_native_re = re.compile('([-a-z0-9+.]+)_(.+?)\.tar\.(gz|bz2|lzma|xz)$')
native_version_re = re.compile('\s*.*-')
toplevel_directory = None
tmp_new_suffix = '.dinstall-new'
tmp_old_suffix = '.dinstall-old'
dinstall_subdir = 'mini-dinstall'
incoming_subdir = 'incoming'
socket_name = 'master'
logfile_name = 'mini-dinstall.log'
configfile_names = ['/etc/mini-dinstall.conf', '~/.mini-dinstall.conf']
use_dnotify = False
mail_on_success = True
tweet_on_success = False
default_poll_time = 30
default_max_retry_time = 60 * 60 * 24 * 2
trigger_reindex = True
mail_log_level = 'ERROR'
mail_log_flush_level = 'ERROR'
mail_log_flush_count = 10
mail_to = getpass.getuser()
mail_server = 'localhost'
incoming_permissions = 0o750
tweet_server = 'identica'
tweet_user = None
tweet_password = None
default_architectures = ["all", "i386"]
default_distributions = ["unstable"]
distributions = {}
hashes = ['sha256']
scantime = 60
mail_subject_template = "mini-dinstall: Successfully installed %(source)s %(version)s to %(distribution)s"
mail_body_template = """
Package: %(source)s
Maintainer: %(maintainer)s
Changed-By: %(changed-by)s
Changes:
%(changes_without_dot)s
""".lstrip()
tweet_template = "Installed %(source)s %(version)s to %(distribution)s"
def usage(ecode, ver_only=False):
print("mini-dinstall", pkg_version)
if ver_only:
sys.exit(ecode)
print("""
Copyright (c) 2002 Colin Walters <walters@gnu.org>
Licensed under the GNU GPL.
Usage: mini-dinstall [OPTIONS...] [DIRECTORY]
Options:
-v, --verbose Display extra information
-q, --quiet Display less information
-c, --config=FILE Parse configuration info from FILE
-d, --debug Output information to stdout as well as log
-f, --foreground Run daemon in foreground
--no-log Don't write information to log file
-n, --no-act Don't actually perform changes
-b, --batch Don't daemonize; run once, then exit
-r, --run Process queue immediately
-k, --kill Kill the running mini-dinstall
--no-db Disable lookups on package database
--help What you're looking at
--version Print the software version and exit
""".strip())
sys.exit(ecode)
try:
(opts, args) = getopt.getopt(sys.argv[1:], 'vqc:dfnbrk',
['verbose', 'quiet', 'config=', 'debug', 'foreground', 'no-log',
'no-act', 'batch', 'run', 'kill', 'no-db', 'help', 'version'])
except getopt.GetoptError as e:
sys.stderr.write("Error reading arguments: %s\n" % e)
usage(1)
for (key, value) in opts:
if key == '--help':
usage(0)
elif key == '--version':
usage(0, ver_only=True)
if len(args) > 1:
sys.stderr.write("Unknown arguments: %s\n" % args[1:])
usage(1)
# don't propagate exceptions that happen while logging
logging.raiseExceptions = False
logger = logging.getLogger("mini-dinstall")
loglevel = logging.WARN
no_act = False
debug_mode = False
run_mode = False
kill_mode = False
nodb_mode = False
no_log = False
foreground_mode = False
batch_mode = False
custom_config_files = False
for (key, value) in opts:
if key in ('-v', '--verbose'):
if loglevel == logging.INFO:
loglevel = logging.DEBUG
elif loglevel == logging.WARN:
loglevel = logging.INFO
elif key in ('-q', '--quiet'):
if loglevel == logging.WARN:
loglevel = logging.ERROR
elif loglevel == logging.WARN:
loglevel = logging.CRITICAL
elif key in ('-c', '--config'):
if not custom_config_files:
custom_config_files = True
configfile_names = []
configfile_names.append(os.path.abspath(os.path.expanduser(value)))
elif key in ('-n', '--no-act'):
no_act = True
elif key in ('-d', '--debug'):
debug_mode = True
elif key in ('--no-log',):
no_log = True
elif key in ('-b', '--batch'):
batch_mode = True
elif key in ('-f', '--foreground'):
foreground_mode = True
elif key in ('-r', '--run'):
run_mode = True
elif key in ('-k', '--kill'):
kill_mode = True
elif key in ('--no-db',):
nodb_mode = True
def do_and_log(msg, function, *args):
try:
logger.debug(msg)
except:
pass
if not no_act:
function(*args)
def do_mkdir(name):
if os.access(name, os.X_OK):
return
try:
do_and_log('Creating directory "%s"' % name, os.mkdir, name)
except OSError as e:
logger.error(e)
exit(1)
def do_rename(source, target):
do_and_log('Renaming "%s" to "%s"' % (source, target), shutil.copy, source, target)
os.remove(source)
def do_chmod(name, mode):
if mode == 0:
return
do_and_log('Changing mode of "%s" to %o' % (name, mode), os.chmod, name, mode)
logger.setLevel(logging.DEBUG)
stderr_handler = logging.StreamHandler()
stderr_handler.setLevel(loglevel)
logger.addHandler(stderr_handler)
stderr_handler.setLevel(loglevel)
stderr_handler.setFormatter(logging.Formatter(fmt="%(name)s [%(thread)d] %(levelname)s: %(message)s"))
configp = ConfigParser()
configfile_names = [os.path.abspath(os.path.expanduser(x)) for x in configfile_names]
logger.debug("Reading config files: %s" % configfile_names)
configp.read(configfile_names)
class SubjectSpecifyingLoggingSMTPHandler(logging.handlers.SMTPHandler):
def __init__(self, *args, **kwargs):
logging.handlers.SMTPHandler.__init__(*[self] + list(args) + ['dummy'], **kwargs)
def setSubject(self, subject):
self._subject = subject
def getSubject(self, record):
return re.sub('%l', record.levelname, self._subject)
if configp.has_option('DEFAULT', 'mail_log_level'):
mail_log_level = configp.get('DEFAULT', 'mail_log_level')
if configp.has_option('DEFAULT', 'mail_on_success'):
mail_on_success = configp.get('DEFAULT', 'mail_on_success')
if mail_log_level != 'NONE' or mail_on_success:
if configp.has_option('DEFAULT', 'mail_to'):
mail_to = configp.get('DEFAULT', 'mail_to')
if configp.has_option('DEFAULT', 'mail_server'):
mail_server = configp.get('DEFAULT', 'mail_server')
if mail_log_level != 'NONE':
if configp.has_option('DEFAULT', 'mail_log_flush_count'):
mail_log_flush_count = configp.getint('DEFAULT', 'mail_log_flush_count')
if configp.has_option('DEFAULT', 'mail_log_flush_level'):
mail_log_flush_level = configp.get('DEFAULT', 'mail_log_flush_level')
mail_smtp_handler = SubjectSpecifyingLoggingSMTPHandler(mail_server, 'Mini-Dinstall <%s@%s>' % (
getpass.getuser(), socket.getfqdn()), [mail_to])
mail_smtp_handler.setSubject('mini-dinstall log notice (%l)')
mail_handler = logging.handlers.MemoryHandler(mail_log_flush_count,
flushLevel=logging.__dict__[mail_log_flush_level], target=mail_smtp_handler)
mail_handler.setLevel(logging.__dict__[mail_log_level])
logger.addHandler(mail_handler)
if args:
toplevel_directory = args[0]
elif configp.has_option('DEFAULT', 'archivedir'):
toplevel_directory = os.path.expanduser(configp.get('DEFAULT', 'archivedir'))
else:
logger.error("No archivedir specified on command line or in config files.")
sys.exit(1)
toplevel_directory = os.path.abspath(toplevel_directory)
if configp.has_option('DEFAULT', 'incoming_permissions'):
incoming_permissions = int(configp.get('DEFAULT', 'incoming_permissions'), 8)
do_mkdir(toplevel_directory)
dinstall_subdir = os.path.join(toplevel_directory, dinstall_subdir)
do_mkdir(dinstall_subdir)
lockfilename = os.path.join(dinstall_subdir, 'mini-dinstall.lock')
def process_exists(pid):
try:
os.kill(pid, 0)
except OSError as e:
return False
return True
if os.access(lockfilename, os.R_OK):
pid = open(lockfilename).read()
error = ''
if not pid:
error = "Lockfile present but empty; use mini-dinstall -k to remove it"
warn = "Lockfile present but empty, removing it"
else:
pid = int(pid)
if not process_exists(pid):
error = "No process running at %d; use mini-dinstall -k to remove lockfile" % pid
warn = "No process running at %d, removing lockfile" % pid
if error:
if run_mode:
logger.error(error)
sys.exit(1)
logger.warning(warn)
os.unlink(lockfilename)
if kill_mode:
sys.exit(0)
if not os.path.isabs(socket_name):
socket_name = os.path.join(dinstall_subdir, socket_name)
if run_mode or kill_mode:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logger.debug('Connecting...')
sock.connect(socket_name)
if run_mode:
logger.debug('Sending RUN command')
sock.send('RUN\n'.encode('utf-8'))
else:
logger.debug('Sending DIE command')
sock.send('DIE\n'.encode('utf-8'))
logger.debug('Reading response')
response = sock.recv(8192).decode('utf-8')
print(response)
sys.exit(0)
if configp.has_option('DEFAULT', 'logfile'):
logfile_name = configp.get('DEFAULT', 'logfile')
if not no_log:
if not os.path.isabs(logfile_name):
logfile_name = os.path.join(dinstall_subdir, logfile_name)
logger.debug("Adding log file: %s" % logfile_name)
filehandler = logging.FileHandler(logfile_name)
if loglevel == logging.WARN:
filehandler.setLevel(logging.INFO)
else:
filehandler.setLevel(logging.DEBUG)
logger.addHandler(filehandler)
filehandler.setFormatter(logging.Formatter(fmt="%(asctime)s %(name)s [%(thread)d] %(levelname)s: %(message)s", datefmt="%b %d %H:%M:%S"))
logger.info('Booting mini-dinstall %s' % pkg_version)
class DinstallException(Exception):
def __init__(self, value):
self._value = value
def __str__(self):
return repr(self._value)
if not configp.has_option('DEFAULT', 'archive_style'):
logger.critical("You must set the default archive_style option (since version 0.4.0)")
logging.shutdown()
sys.exit(1)
default_verify_sigs = os.access('/usr/share/keyrings/debian-keyring.gpg', os.R_OK)
default_keyrings = []
default_extra_keyrings = []
if configp.has_option('DEFAULT', 'architectures'):
default_architectures = re.split(', *', configp.get('DEFAULT', 'architectures'))
if configp.has_option('DEFAULT', 'verify_sigs'):
default_verify_sigs = configp.getboolean('DEFAULT', 'verify_sigs')
if configp.has_option('DEFAULT', 'trigger_reindex'):
default_trigger_reindex = configp.getboolean('DEFAULT', 'trigger_reindex')
if configp.has_option('DEFAULT', 'poll_time'):
default_poll_time = configp.getint('DEFAULT', 'poll_time')
if configp.has_option('DEFAULT', 'max_retry_time'):
default_max_retry_time = configp.getint('DEFAULT', 'max_retry_time')
if configp.has_option('DEFAULT', 'expire_release_files'):
expire_release_files = configp.getboolean('DEFAULT', 'expire_release_files')
if configp.has_option('DEFAULT', 'keyids'):
keyids = re.split(', *', configp.get('DEFAULT', 'keyids'))
if configp.has_option('DEFAULT', 'keyrings'):
default_keyrings = re.split(', *', configp.get('DEFAULT', 'keyrings'))
if configp.has_option('DEFAULT', 'extra_keyrings'):
default_extra_keyrings = re.split(', *', configp.get('DEFAULT', 'extra_keyrings'))
if configp.has_option('DEFAULT', 'use_byhash'):
use_byhash = configp.getboolean('DEFAULT', 'use_byhash')
if configp.has_option('DEFAULT', 'use_dnotify'):
use_dnotify = configp.getboolean('DEFAULT', 'use_dnotify')
if configp.has_option('DEFAULT', 'mail_subject_template'):
mail_subject_template = configp.get('DEFAULT', 'mail_subject_template')
if configp.has_option('DEFAULT', 'mail_body_template'):
mail_body_template = configp.get('DEFAULT', 'mail_body_template')
if configp.has_option('DEFAULT', 'tweet_on_success'):
tweet_on_success = configp.get('DEFAULT', 'tweet_on_success')
if configp.has_option('DEFAULT', 'tweet_template'):
tweet_template = configp.get('DEFAULT', 'tweet_template')
if configp.has_option('DEFAULT', 'tweet_server'):
tweet_server = configp.get('DEFAULT', 'tweet_server')
if configp.has_option('DEFAULT', 'tweet_user'):
tweet_user = configp.get('DEFAULT', 'tweet_user')
if configp.has_option('DEFAULT', 'tweet_password'):
tweet_password = configp.get('DEFAULT', 'tweet_password')
sects = configp.sections()
if sects:
for sect in sects:
distributions[sect] = {}
if configp.has_option(sect, "architectures"):
distributions[sect]["arches"] = re.split(', *', configp.get(sect, "architectures"))
else:
distributions[sect]["arches"] = default_architectures
else:
for dist in default_distributions:
distributions[dist] = {"arches": default_architectures}
class DistOptionHandler:
def __init__(self, distributions, configp):
self._configp = configp
self._distributions = distributions
user = getpass.getuser()
self._optionmap = {
'alias': ['str', None],
'poll_time': ['int', default_poll_time],
'max_retry_time': ['int', default_max_retry_time],
'post_install_script': ['str', None],
'pre_install_script': ['str', None],
'dynamic_reindex': ['bool', True],
'restrict_changes_files': ['bool', None],
'chown_changes_files': ['bool', None],
'keep_old': ['bool', False],
'mail_on_success': ['bool', mail_on_success],
'tweet_on_success': ['bool', tweet_on_success],
'archive_style': ['str', None],
'override_file': ['str', None],
# Release file stuff
'generate_release': ['bool', False],
'release_origin': ['str', user],
'release_label': ['str', user],
'release_suite': ['str', None],
'release_codename': ['str', None],
'experimental_release': ['bool', False],
'backport_release': ['bool', False],
'release_description': ['str', None],
'release_signscript': ['str', None],
'keyids': ['list', None],
'keyrings': ['list', None],
'extra_keyrings': ['list', None],
'expire_release_files': ['bool', False],
'verify_sigs': ['bool', False],
'use_byhash': ['bool', True]
}
def get_option_map(self, dist):
ret = self._distributions[dist]
for (key, value) in list(self._optionmap.items()):
if self._configp.has_option(dist, key):
ret[key] = self.get_option(value[0], dist, key)
elif self._configp.has_option('DEFAULT', key):
ret[key] = self.get_option(value[0], 'DEFAULT', key)
else:
ret[key] = value[1]
return ret
def get_option(self, type, dist, key):
if type == 'int':
return self._configp.getint(dist, key)
elif type == 'str':
return self._configp.get(dist, key)
elif type == 'list':
return re.split(', *', self._configp.get(dist, key))
elif type == 'bool':
return self._configp.getboolean(dist, key)
assert None
distoptionhandler = DistOptionHandler(distributions, configp)
for dist in list(distributions.keys()):
distributions[dist] = distoptionhandler.get_option_map(dist)
if distributions[dist]['archive_style'] not in ('simple-subdir', 'flat'):
raise DinstallException('Unknown archive style "%s"' % distributions[dist]['archive_style'])
if distributions[dist]['chown_changes_files'] is not None:
logger.warning("'chown_changes_files' is deprecated, please use 'restrict_changes_files' instead")
if distributions[dist]['restrict_changes_files'] is None:
distributions[dist]['restrict_changes_files'] = distributions[dist]['chown_changes_files']
if distributions[dist]['restrict_changes_files'] is None:
distributions[dist]['restrict_changes_files'] = True
logger.debug("Distributions: %s" % distributions)
os.chdir(toplevel_directory)
do_mkdir(dinstall_subdir)
rejectdir = os.path.join(dinstall_subdir, 'REJECT')
incoming_subdir = os.path.join(dinstall_subdir, incoming_subdir)
do_mkdir(rejectdir)
do_mkdir(incoming_subdir)
do_chmod(incoming_subdir, incoming_permissions)
## IPC stuff
# Used by all threads to determine whether or not they should exit
die_event = threading.Event()
# These global variables are used in IncomingDir::daemonize
# I couldn't figure out any way to pass state to a BaseRequestHandler.
reprocess_needed = threading.Event()
reprocess_finished = threading.Event()
reprocess_lock = threading.Lock()
class IncomingDirRequestHandler(socketserver.StreamRequestHandler, socketserver.BaseRequestHandler):
def handle(self):
logger.debug('Got request from %s' % self.client_address)
req = self.rfile.readline().strip().decode('utf-8')
if req == 'RUN':
logger.debug('Doing RUN command')
reprocess_lock.acquire()
reprocess_needed.set()
logger.debug('Waiting on reprocessing')
reprocess_finished.wait()
reprocess_finished.clear()
reprocess_lock.release()
self.wfile.write('200 Reprocessing complete'.encode('utf-8'))
elif req == 'DIE':
logger.debug('Doing DIE command')
self.wfile.write('200 Beginning shutdown'.encode('utf-8'))
die_event.set()
else:
logger.debug('Got unknown command %s' % req)
self.wfile.write('500 Unknown request'.encode('utf-8'))
class ExceptionThrowingThreadedUnixStreamServer(socketserver.ThreadingUnixStreamServer):
def handle_error(self, request, client_address):
self._logger.exception("Unhandled exception during request processing; shutting down")
die_event.set()
class IncomingDir(threading.Thread):
def __init__(self, dir, archivemap, logger, trigger_reindex=True, poll_time=30, max_retry_time=172800, batch_mode=False):
threading.Thread.__init__(self, name="incoming")
self._dir = dir
self._archivemap = archivemap
self._logger = logger
self._trigger_reindex = trigger_reindex
self._poll_time = poll_time
self._batch_mode = batch_mode
self._max_retry_time = max_retry_time
self._last_failed_targets = {}
self._eventqueue = queue.Queue()
self._done_event = threading.Event()
# ensure we always have some reprocess queue
self._reprocess_queue = {}
def run(self):
self._logger.info('Created new installer thread (%s)' % self.name)
self._logger.info('Entering batch mode...')
initial_reprocess_queue = []
initial_fucked_list = []
try:
for (changefilename, changefile) in self._get_changefiles():
if self._changefile_ready(changefilename, changefile):
try:
self._install_changefile(changefilename, changefile, 0)
except Exception:
logger.exception('Unable to install "%s"; adding to screwed list' % changefilename)
initial_fucked_list.append(changefilename)
else:
self._logger.warning('Skipping "%s"; upload incomplete' % changefilename)
initial_reprocess_queue.append(changefilename)
if not self._batch_mode:
self._daemonize(initial_reprocess_queue, initial_fucked_list)
self._done_event.set()
self._logger.info('All packages in incoming dir installed; exiting')
except Exception as e:
self._logger.exception("Unhandled exception; shutting down")
die_event.set()
self._done_event.set()
return False
def _abspath(self, *args):
return os.path.abspath(os.path.join(*[self._dir] + list(args)))
def _get_changefiles(self):
ret = []
globpath = self._abspath("*.changes")
self._logger.debug("glob: %s" % globpath)
changefilenames = glob.glob(globpath)
for changefilename in changefilenames:
if changefilename not in self._reprocess_queue:
self._logger.info('Examining "%s"' % changefilename)
changefile = ChangeFile()
try:
changefile.load_from_file(changefilename)
except ChangeFileException:
self._logger.debug('Unable to parse "%s", skipping' % changefilename)
continue
ret.append((changefilename, changefile))
else:
self._logger.debug('Skipping "%s" during new scan because it is in the reprocess queue.' % changefilename)
return ret
def _changefile_ready(self, changefilename, changefile):
try:
dist = changefile['distribution']
except KeyError as e:
self._logger.warn('Unable to read distribution field for "%s"; data: %s' % (changefilename, changefile))
return False
try:
changefile.verify(self._abspath(''))
except ChangeFileException:
return False
return True
def _install_changefile(self, changefilename, changefile, doing_reprocess):
changefiledist = changefile['distribution']
for dist in list(distributions.keys()):
distributions[dist] = distoptionhandler.get_option_map(dist)
if distributions[dist]['alias'] and changefiledist in distributions[dist]['alias']:
logger.info('Distribution "%s" is an alias for "%s"' % (changefiledist, dist))
break
else:
dist = changefiledist
if dist not in list(self._archivemap.keys()):
raise DinstallException('Unknown distribution "%s" in "%s"' % (dist, changefilename))
logger.debug('Installing %s in archive %s' % (changefilename, self._archivemap[dist][1].name))
self._archivemap[dist][0].install(changefilename, changefile)
if self._trigger_reindex:
if doing_reprocess:
logger.debug('Waiting on archive %s to reprocess' % self._archivemap[dist][1].name)
self._archivemap[dist][1].wait_reprocess()
else:
logger.debug('Notifying archive %s of change' % self._archivemap[dist][1].name)
self._archivemap[dist][1].notify()
logger.debug('Finished processing %s' % changefilename)
def _reject_changefile(self, changefilename, changefile, e):
dist = changefile['distribution']
if dist not in self._archivemap:
raise DinstallException('Unknown distribution "%s" in "%s"' % (dist, changefilename))
self._archivemap[dist][0].reject(changefilename, changefile, e)
def _daemon_server_isready(self):
(inready, outready, exready) = select.select([self._server.fileno()], [], [], 0)
return bool(inready)
def _daemon_event_ispending(self):
return die_event.isSet() or reprocess_needed.isSet() or self._daemon_server_isready() or not self._eventqueue.empty()
def _daemon_reprocess_pending(self):
curtime = time.time()
for (starttime, nexttime, delay) in list(self._reprocess_queue.values()):
if curtime >= nexttime:
return True
return False
def _daemonize(self, init_reprocess_queue, init_fucked_list):
self._logger.info('Entering daemon mode...')
self._dnotify = DirectoryNotifierFactory().create([self._dir], use_dnotify=use_dnotify, poll_time=self._poll_time, cancel_event=die_event)
self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name="Incoming watcher")
self._async_dnotify.start()
try:
os.unlink(socket_name)
except OSError as e:
pass
self._server = ExceptionThrowingThreadedUnixStreamServer(socket_name, IncomingDirRequestHandler)
self._server.allow_reuse_address = True
retry_time = 30
self._reprocess_queue = {}
fucked = init_fucked_list
doing_reprocess = False
# Initialize the reprocessing queue
for changefilename in init_reprocess_queue:
curtime = time.time()
self._reprocess_queue[changefilename] = [curtime, curtime, retry_time]
# The main daemon loop
while True:
# Wait until we have something to do
while not (self._daemon_event_ispending() or self._daemon_reprocess_pending()):
time.sleep(0.5)
self._logger.debug('Checking for pending server requests')
if self._daemon_server_isready():
self._logger.debug('Handling one request')
self._server.handle_request()
self._logger.debug('Checking for DIE event')
if die_event.isSet():
self._logger.debug('DIE event caught')
break
self._logger.debug('Scanning for changes')
# do we have anything to reprocess?
for (changefilename, (starttime, nexttime, delay)) in list(self._reprocess_queue.items()):
curtime = time.time()
try:
changefile = ChangeFile()
changefile.load_from_file(changefilename)
except (ChangeFileException, IOError) as e:
if not os.path.exists(changefilename):
self._logger.info('Changes file "%s" got removed' % changefilename)
else:
self._logger.exception('Unable to load Changes file "%s"' % changefilename)
self._logger.warn('Marking "%s" as screwed' % changefilename)
fucked.append(changefilename)
del self._reprocess_queue[changefilename]
continue
if curtime - starttime > self._max_retry_time:
# We've tried too many times; reject it.
self._reject_changefile(changefilename, changefile, DinstallException("Couldn't install \"%s\" in %d seconds" % (changefilename, self._max_retry_time)))
elif curtime >= nexttime:
if self._changefile_ready(changefilename, changefile):
# Let's do it!
self._logger.debug('Preparing to install "%s"' % changefilename)
try:
self._install_changefile(changefilename, changefile, doing_reprocess)
self._logger.debug('Removing "%s" from incoming queue after successful install.' % changefilename)
del self._reprocess_queue[changefilename]
except Exception as e:
logger.exception('Unable to install "%s"; adding to screwed list' % changefilename)
fucked.append(changefilename)
else:
delay *= 2
if delay > 60 * 60:
delay = 60 * 60
self._logger.info("Upload \"%s\" isn't complete; marking for retry in %d seconds" % (changefilename, delay))
self._reprocess_queue[changefilename][1:3] = [time.time() + delay, delay]
# done reprocessing; now scan for changed dirs.
relname = None
self._logger.debug('Checking dnotify event queue')
if not self._eventqueue.empty():
relname = os.path.basename(os.path.abspath(self._eventqueue.get()))
self._logger.debug('Got %s from dnotify' % relname)
if not (relname or doing_reprocess):
if reprocess_needed.isSet():
self._logger.info('Got reprocessing event')
reprocess_needed.clear()
doing_reprocess = True
else:
self._logger.debug('No events to process')
continue
for (changefilename, changefile) in self._get_changefiles():
if changefilename in fucked:
self._logger.warn('Skipping screwed Changes file "%s"' % changefilename)
continue
# Have we tried this changefile before?
if changefilename not in self._reprocess_queue:
self._logger.debug('New Changes file "%s"' % changefilename)
if self._changefile_ready(changefilename, changefile):
try:
self._install_changefile(changefilename, changefile, doing_reprocess)
except Exception as e:
logger.exception('Unable to install "%s"; adding to screwed list' % changefilename)
fucked.append(changefilename)
else:
curtime = time.time()
self._logger.info("Upload \"%s\" isn't complete; marking for retry in %d seconds" % (changefilename, retry_time))
self._reprocess_queue[changefilename] = [curtime, curtime + retry_time, retry_time]
if doing_reprocess:
doing_reprocess = False
self._logger.info('Reprocessing complete')
reprocess_finished.set()
def wait(self):
self._done_event.wait()
def parse_versions(fullversion):
debianversion = re.sub('^[0-9]+:', '', fullversion)
upstreamver = re.sub('-[^-]*$', '', debianversion)
return (upstreamver, debianversion)
class ArchiveDir:
def __init__(self, dir, logger, configdict, batch_mode=False, keyrings=None, extra_keyrings=None, verify_sigs=False):
self._dir = dir
self._name = os.path.basename(os.path.abspath(dir))
self._logger = logger
for (key, value) in list(configdict.items()):
self._logger.debug('Setting "%s" => "%s" in archive "%s"' % ('_' + key, value, self._name))
self.__dict__['_' + key] = value
do_mkdir(dir)
self._batch_mode = batch_mode
if 'verify_sigs' in configdict:
self._verify_sigs = configdict['verify_sigs']
else:
self._verify_sigs = verify_sigs
if configdict['keyrings']:
self._keyrings = configdict['keyrings']
else:
self._keyrings = keyrings
if configdict['extra_keyrings']:
self._extra_keyrings = configdict['extra_keyrings']
else:
self._extra_keyrings = extra_keyrings
if self._mail_on_success:
self._success_logger = logging.Logger("mini-dinstall." + self._name)
self._success_logger.setLevel(logging.DEBUG)
self.mailHandler = SubjectSpecifyingLoggingSMTPHandler(mail_server, 'Mini-Dinstall <%s@%s>' % (
getpass.getuser(), socket.getfqdn()), [mail_to])
self.mailHandler.setLevel(logging.DEBUG)
self._success_logger.addHandler(self.mailHandler)
self._clean_targets = []
def _abspath(self, *args):
return os.path.abspath(os.path.join(*[self._dir] + list(args)))
def _relpath(self, *args):
return os.path.join(*[self._name] + list(args))
def install(self, changefilename, changefile):
retval = False
try:
retval = self._install_run_scripts(changefilename, changefile)
except Exception:
self._logger.exception("Unhandled exception during installation")
if not retval:
self._logger.info('Failed to install "%s"' % changefilename)
def reject(self, changefilename, changefile, reason):
self._reject_changefile(changefilename, changefile, reason)
def _install_run_scripts(self, changefilename, changefile):
self._logger.info('Preparing to install "%s" in archive %s' % (changefilename, self._name))
sourcename = changefile['source']
version = changefile['version']
if self._verify_sigs:
self._logger.info('Verifying signature on "%s"' % changefilename)
try:
if self._keyrings:
verifier = DebianSigVerifier(keyrings=list(map(os.path.expanduser, self._keyrings)), extra_keyrings=self._extra_keyrings)
else:
verifier = DebianSigVerifier(extra_keyrings=self._extra_keyrings)
output = verifier.verify(changefilename)
logger.debug(output)
logger.info('Good signature on "%s"' % changefilename)
except GPGSigVerificationFailure as e:
msg = 'Failed to verify signature on "%s": %s\n' % (changefilename, e)
msg += ''.join(e.getOutput())
logger.error(msg)
self._reject_changefile(changefilename, changefile, e)
return False
else:
self._logger.debug('Skipping signature verification on "%s"' % changefilename)
if self._pre_install_script:
try:
self._logger.debug("Running pre-installation script: %s" % self._pre_install_script)
if self._run_script(os.path.abspath(changefilename), self._pre_install_script):
return False
except:
self._logger.exception("failure while running pre-installation script")
return False
try:
self._install_changefile_internal(changefilename, changefile)
except Exception as e:
self._logger.exception('Failed to process "%s"' % changefilename)
self._reject_changefile(changefilename, changefile, e)
return False
if self._restrict_changes_files:
do_chmod(changefilename, 0o600)
target = os.path.join(self._dir, os.path.basename(changefilename))
# the final step
do_rename(changefilename, target)
self._logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name))
if self._mail_on_success:
done = False
missing_fields = []
if 'changes' in changefile:
changefile['changes_without_dot'] = misc.format_changes(changefile['changes'].splitlines())
while not done:
try:
mail_subject = mail_subject_template % changefile
mail_body = mail_body_template % changefile
except KeyError as exc:
key = exc.args[0]
changefile[key] = ''
missing_fields.append(key)
else:
done = True
if missing_fields:
mail_body = mail_body + "\n\nMissing Changes file fields: %s" % missing_fields
mail.send(mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(), socket.getfqdn()), mail_to, mail_body, mail_subject)
if self._tweet_on_success:
done = False
missing_fields = []
if 'changes' in changefile:
changefile['changes_without_dot'] = misc.format_changes(changefile['changes'])
while not done:
try:
tweet_body = tweet_template % changefile
except KeyError as exc:
key = exc.args[0]
changefile[key] = ''
missing_fields.append(key)
else:
done = True
if missing_fields:
tweet_body = tweet_body + "\n\n(errs: %s)" % missing_fields
tweet.send(tweet_body, tweet_server, tweet_user, tweet_password)
if self._post_install_script:
try:
self._logger.debug("Running post-installation script: %s" % self._post_install_script)
self._run_script(target, self._post_install_script)
except:
self._logger.exception("failure while running post-installation script")
return False
return True
def _install_changefile_internal(self, changefilename, changefile):
sourcename = changefile['source']
version = changefile['version']
incomingdir = os.path.dirname(changefilename)
newfiles = []
is_native = not native_version_re.match(version)
if is_native:
(ignored, newdebianver) = parse_versions(version)
else:
(newupstreamver, newdebianver) = parse_versions(version)
is_sourceful = False
for file in [x[2] for x in changefile.getFiles()]:
match = debpackage_re.search(file)
if match:
arch = match.group(3)
if arch not in self._arches:
raise DinstallException("Unknown architecture: %s" % arch)
target = self._arch_target(arch, file)
newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch))
continue
match = debbuildinfo_re.search(file)
if match:
arch = match.group(3)
if arch not in self._arches:
raise DinstallException("Unknown architecture: %s" % arch)
target = self._arch_target(arch, file)
newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch))
continue
match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file) \
or debsrc_orig_re.search(file) or debsrc_native_re.search(file)
if match:
is_sourceful = True
target = self._source_target(file)
newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source'))
all_arches = []
for arch in [x[3] for x in newfiles]:
if arch not in all_arches:
all_arches.append(arch)
completed = []
oldfiles = []
if not self._keep_old:
found_old_bins = False
for (oldversion, oldarch) in [x[1:] for x in self._get_package_versions()]:
if oldarch not in all_arches and apt_pkg.version_compare(oldversion, version) < 0:
found_old_bins = True
for (pkgname, arch) in [x[2:] for x in newfiles]:
if arch == 'source' and found_old_bins:
continue
self._logger.debug('Scanning for old files')
for file in self._read_arch_dir(arch):
match = debpackage_re.search(file) or debbuildinfo_re.search(file)
if not match:
continue
oldpkgname = match.group(1)
oldarch = match.group(3)
file = self._arch_target(arch, file)
if file not in [x[0] for x in oldfiles]:
target = file + tmp_old_suffix
if oldpkgname == pkgname and oldarch == arch:
oldfiles.append((file, target))
self._logger.debug('Scanning "%s" for old files' % self._abspath('source'))
for file in self._read_source_dir():
file = self._source_target(file)
if file not in [x[0] for x in oldfiles]:
target = file + tmp_old_suffix
match = debchanges_re.search(file)
if not match and is_sourceful:
match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file)
if match and match.group(1) == sourcename:
oldfiles.append((file, target))
continue
# We skip the rest of this if it wasn't a
# sourceful upload; really all we do if it isn't
# is clean out old .changes files.
if not is_sourceful:
continue
match = debsrc_orig_re.search(file)
if match and match.group(1) == sourcename:
if not is_native:
(oldupstreamver, olddebianver) = parse_versions(match.group(2))
if apt_pkg.version_compare(oldupstreamver, newupstreamver) < 0:
self._logger.debug('old upstream tarball "%s" version %s < %s, tagging for deletion' % (file, oldupstreamver, newupstreamver))
oldfiles.append((file, target))
continue
else:
self._logger.debug('keeping upstream tarball "%s" version %s' % (file, oldupstreamver))
continue
else:
self._logger.debug('old native tarball "%s", tagging for deletion' % file)
oldfiles.append((file, target))
continue
match = debsrc_native_re.search(file)
if match and match.group(1) in [x[2] for x in newfiles]:
oldfiles.append((file, target))
continue
self._clean_targets = [x[1] for x in oldfiles]
allrenames = oldfiles + [x[:2] for x in newfiles]
try:
for (oldname, newname) in allrenames:
do_rename(oldname, newname)
completed.append((oldname, newname))
except OSError as e:
logger.exception("Failed to do rename (%s); attempting rollback" % e.strerror)
try:
self._logger.error(traceback.format_tb(sys.exc_info()[2]))
except:
pass
# Unwind to previous state
for (newname, oldname) in completed:
do_rename(oldname, newname)
raise
self._clean_targets = []
# remove old files
self.clean()
def _run_script(self, changefilename, script):
if script:
script = os.path.expanduser(script)
cmd = '%s %s' % (script, changefilename)
self._logger.info('Running "%s"' % cmd)
if not no_act:
if not os.access(script, os.X_OK):
self._logger.error("Can't execute script \"%s\"" % script)
return True
try:
subprocess.check_call([script, changefilename])
except subprocess.CalledProcessError as e:
self._logger.error('script "%s" exited with error code %d' % (cmd, e.returncode))
return True
return False
def _reject_changefile(self, changefilename, changefile, exception):
sourcename = changefile['source']
version = changefile['version']
incomingdir = os.path.dirname(changefilename)
try:
f = open(os.path.join(rejectdir, "%s_%s.reason" % (sourcename, version)), 'w')
if isinstance(exception, str):
f.write(exception)
else:
traceback.print_exception(Exception, exception, None, None, f)
f.close()
for file in [x[2] for x in changefile.getFiles()]:
if os.access(os.path.join(incomingdir, file), os.R_OK):
file = os.path.join(incomingdir, file)
else:
file = self._abspath(file)
target = os.path.join(rejectdir, os.path.basename(file))
do_rename(file, target)
do_rename(changefilename, os.path.join(rejectdir, os.path.basename(changefilename)))
self._logger.info('Rejecting "%s": %s' % (changefilename, repr(exception)))
except Exception:
self._logger.error("Unhandled exception while rejecting %s; archive may be in inconsistent state" % changefilename)
raise
def clean(self):
self._logger.debug('Removing old files')
for file in self._clean_targets:
self._logger.debug('Deleting "%s"' % file)
if not no_act:
os.unlink(file)
class SimpleSubdirArchiveDir(ArchiveDir):
def __init__(self, *args, **kwargs):
ArchiveDir.__init__(*[self] + list(args), **kwargs)
for arch in self._arches + ['source']:
target = os.path.join(self._dir, arch)
do_mkdir(target)
def _read_source_dir(self):
return os.listdir(self._abspath('source'))
def _read_arch_dir(self, arch):
return os.listdir(self._abspath(arch))
def _arch_target(self, arch, file):
return self._abspath(arch, file)
def _source_target(self, file):
return self._arch_target('source', file)
def _get_package_versions(self):
ret = []
for arch in self._arches:
for file in self._read_arch_dir(arch):
match = debpackage_re.search(file)
if match:
ret.append((match.group(1), match.group(2), match.group(3)))
return ret
class FlatArchiveDir(ArchiveDir):
def _read_source_dir(self):
return os.listdir(self._dir)
def _read_arch_dir(self, arch):
return os.listdir(self._dir)
def _arch_target(self, arch, file):
return self._abspath(file)
def _source_target(self, file):
return self._arch_target('source', file)
def _get_package_versions(self):
ret = []
for file in self._abspath(''):
match = debpackage_re.search(file)
if match:
ret.append((match.group(1), match.group(2), match.group(3)))
return ret
class ArchiveDirIndexer(threading.Thread):
def __init__(self, dir, logger, configdict, use_dnotify=False, batch_mode=True):
self._dir = dir
self._name = os.path.basename(os.path.abspath(dir))
threading.Thread.__init__(self, name=self._name)
self._logger = logger
self._eventqueue = queue.Queue()
for (key, value) in list(configdict.items()):
self._logger.debug('Setting "%s" => "%s" in archive "%s"' % ('_' + key, value, self._name))
self.__dict__['_' + key] = value
do_mkdir(dir)
self._use_dnotify = use_dnotify
self._batch_mode = batch_mode
self._done_event = threading.Event()
def _abspath(self, *args):
return os.path.abspath(os.path.join(*[self._dir] + list(args)))
def _relpath(self, *args):
return os.path.join(*[self._name] + list(args))
def _make_indexfile(self, dir, type, name, arch=None):
cmdline = ['apt-ftparchive',
'-o', 'APT::FTPArchive::AlwaysStat=true',
'-o', 'APT::FTPArchive::SHA1=false',
'-o', 'APT::FTPArchive::MD5=false', type, dir]
if self._override_file:
cmdline.append(self._override_file)
if arch:
cmdline.extend(['--arch', arch])
if not nodb_mode:
cmdline.extend(['--db', '%s.db' % dir])
self._logger.debug("Running: %s" % ' '.join(cmdline))
if no_act:
return
if arch:
packagesfilename = os.path.join(dir, '%s-%s' % (name, arch))
else:
packagesfilename = os.path.join(dir, name)
newpackagesfilename = packagesfilename + '.new'
xzpackagesfilename = packagesfilename + '.xz'
newxzpackagesfilename = newpackagesfilename + '.xz'
newpackagesfile = open(newpackagesfilename, 'wb')
newxzpackagesfile = lzma.open(newxzpackagesfilename, 'w')
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
cwd=os.path.join(self._dir, '..'))
buf = process.stdout.read(8192)
while buf:
newpackagesfile.write(buf)
newxzpackagesfile.write(buf)
buf = process.stdout.read(8192)
process.wait()
if process.returncode != 0:
raise DinstallException("apt-ftparchive exited with status code %d" % process.returncode)
newpackagesfile.close()
newxzpackagesfile.close()
shutil.move(newpackagesfilename, packagesfilename)
shutil.move(newxzpackagesfilename, xzpackagesfilename)
if self._use_byhash:
for hash in hashes:
do_mkdir(os.path.join(dir, 'by-hash'))
hashdir = os.path.join(dir, 'by-hash', hash.upper())
do_mkdir(hashdir)
xzbyhash = self._get_file_sum(hash, xzpackagesfilename)
shutil.copy(xzpackagesfilename, os.path.join(hashdir, xzbyhash))
mtime = lambda f: os.stat(os.path.join(hashdir, f)).st_mtime
for oldbyhash in sorted(os.listdir(hashdir), key=mtime)[:-16]:
self._logger.debug("Removing old by-hash file: %s" % oldbyhash)
os.remove(os.path.join(hashdir, oldbyhash))
def _make_packagesfile(self, dir):
self._make_indexfile(dir, 'packages', 'Packages')
def _make_sourcesfile(self, dir):
self._make_indexfile(dir, 'sources', 'Sources')
def _make_contentsfile(self, dir, arch):
self._make_indexfile(dir, 'contents', 'Contents', arch)
def _sign_releasefile(self, name, dir):
if self._release_signscript:
try:
self._logger.debug("Running release signing script: %s" % self._release_signscript)
if self._run_script(name, self._release_signscript, dir=dir):
return False
except:
self._logger.exception("failure while running Release signature script")
return False
return True
# Copied from ArchiveDir
def _run_script(self, changefilename, script, dir=None):
if script:
script = os.path.expanduser(script)
cmd = '%s %s' % (script, changefilename)
self._logger.info('Running "%s"' % cmd)
if not no_act:
if not os.access(script, os.X_OK):
self._logger.error("Can't execute script \"%s\"" % script)
return True
try:
subprocess.check_call([script, changefilename], cwd=dir)
except subprocess.CalledProcessError as e:
self._logger.error('script "%s" exited with error code %d' % (cmd, e.returncode))
return True
return False
def _get_file_sum(self, type, filename):
ret = misc.get_file_sum(self, type, filename)
if ret:
return ret
else:
raise DinstallException('cannot compute hash of type %s; no builtin method or /usr/bin/%ssum', type, type)
def _do_hash(self, hash, indexfiles, f):
"""
write hash digest into filehandle
@param hash: used hash algorithm
@param indexfiles: system architectures
@param f: file handle
"""
f.write("%s%s:\n" % (hash.upper(), ['', 'Sum'][hash == 'md5']))
for file in indexfiles:
absfile = self._abspath(file)
h = self._get_file_sum(hash, absfile)
size = os.stat(absfile)[stat.ST_SIZE]
f.write(' %s% 16d %s\n' % (h, size, os.path.basename(absfile)))
def _index_all(self, force=False):
self._index(self._arches + ['source'], force)
def _gen_release_all(self, force=False):
self._gen_release(self._arches, force)
def run(self):
self._logger.info('Created new thread (%s) for archive indexer %s' % (self.name, self._name))
self._logger.info('Entering batch mode...')
try:
self._index_all(True)
self._gen_release_all(True)
if not self._batch_mode:
# never returns
self._daemonize()
self._done_event.set()
except Exception as e:
self._logger.exception("Unhandled exception; shutting down")
die_event.set()
self._done_event.set()
self._logger.info('Thread "%s" exiting' % self.name)
def _daemon_event_ispending(self):
return die_event.isSet() or not self._eventqueue.empty()
def _daemonize(self):
self._logger.info('Entering daemon mode...')
if self._dynamic_reindex:
self._dnotify = DirectoryNotifierFactory().create(self._get_dnotify_dirs(), use_dnotify=self._use_dnotify, poll_time=self._poll_time, cancel_event=die_event)
self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name=self._name + " Indexer")
self._async_dnotify.start()
# The main daemon loop
while True:
# Wait until we have a pending event
while not self._daemon_event_ispending():
time.sleep(1)
if die_event.isSet():
break
self._logger.debug('Reading from event queue')
setevent = None
dir = None
obj = self._eventqueue.get()
if isinstance(obj, str):
self._logger.debug('got dir change')
dir = obj
elif not obj:
self._logger.debug('got general event')
setevent = None
elif obj.__class__ == threading.Event().__class__:
self._logger.debug('got wait_reprocess event')
setevent = obj
else:
self._logger.error("unknown object %s in event queue" % obj)
assert None
# This is to protect against both lots of activity, and to
# prevent race conditions, so we can rely on timestamps.
time.sleep(1)
if not self._reindex_needed():
if setevent:
self._logger.debug('setting wait_reprocess event')
setevent.set()
continue
if not dir:
self._logger.debug('Got general change')
self._index_all(True)
self._gen_release_all(True)
else:
self._logger.debug('Got change in %s' % dir)
self._index([os.path.basename(os.path.abspath(dir))])
self._gen_release([os.path.basename(os.path.abspath(dir))])
if setevent:
self._logger.debug('setting wait_reprocess event')
setevent.set()
def _reindex_needed(self):
if os.access(self._abspath('Release.gpg'), os.R_OK):
gpg_mtime = os.stat(self._abspath('Release.gpg'))[stat.ST_MTIME]
for dir in self._get_dnotify_dirs():
if os.stat(self._abspath(dir))[stat.ST_MTIME] > gpg_mtime:
return True
else:
return True
return False
def _index(self, arches, force=False):
self._index_impl(arches, force)
def _gen_release(self, arches, force=False):
self._gen_release_impl(arches, force)
def wait_reprocess(self):
e = threading.Event()
self._eventqueue.put(e)
self._logger.debug('waiting on reprocess')
while not (e.isSet() or die_event.isSet()):
time.sleep(0.5)
self._logger.debug('done waiting on reprocess')
def wait(self):
self._done_event.wait()
def notify(self):
self._eventqueue.put(None)
class SimpleSubdirArchiveDirIndexer(ArchiveDirIndexer):
def __init__(self, *args, **kwargs):
ArchiveDirIndexer.__init__(*[self] + list(args), **kwargs)
for arch in self._arches + ['source']:
target = os.path.join(self._dir, arch)
do_mkdir(target)
def _index_impl(self, arches, force=False):
for arch in arches:
dirmtime = os.stat(self._relpath(arch))[stat.ST_MTIME]
if arch != 'source':
pkgsfile = self._relpath(arch, 'Packages')
if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
self._logger.info('Generating Packages file for %s...' % arch)
self._make_packagesfile(self._relpath(arch))
self._logger.info('Packages generation complete')
else:
self._logger.info('Skipping generation of Packages file for %s' % arch)
else:
pkgsfile = self._relpath(arch, 'Sources')
if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
self._logger.info('Generating Sources file for %s...' % arch)
self._make_sourcesfile(self._relpath('source'))
self._logger.info('Sources generation complete')
else:
self._logger.info('Skipping generation of Sources file for %s' % arch)
def _gen_release_impl(self, arches, force=False):
for arch in arches + ['source']:
targetname = self._relpath(arch, 'Release')
if not self._generate_release:
if os.access(targetname, os.R_OK):
self._logger.info("Release generation disabled, removing existing Release file")
try:
os.unlink(targetname)
except OSError as e:
pass
return
tmpname = targetname + tmp_new_suffix
if arch != 'source':
uncompr_indexfile = os.path.join(arch, 'Packages')
else:
uncompr_indexfile = os.path.join(arch, 'Sources')
indexfiles = [uncompr_indexfile]
comprexts = ['.xz']
for ext in comprexts:
indexfiles.append(uncompr_indexfile + ext)
if not self._release_needed(targetname, indexfiles):
self._logger.info("Skipping Release generation")
continue
self._logger.info("Generating Release...")
if no_act:
self._logger.info("Release generation complete")
return
f = open(tmpname, 'w')
f.write('Origin: %s\n' % self._release_origin)
f.write('Label: %s\n' % self._release_label)
suite = self._release_suite
if not suite:
suite = self._name
f.write('Suite: %s\n' % suite)
codename = self._release_codename
if not codename:
codename = suite
f.write('Codename: %s/%s\n' % (codename, arch))
if self._experimental_release:
f.write('NotAutomatic: yes\n')
elif self._backport_release:
f.write('NotAutomatic: yes\n')
f.write('ButAutomaticUpgrades: yes\n')
f.write('Date: %s\n' % time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()))
if self._expire_release_files or self._keyids:
f.write('Valid-Until: %s\n' % (datetime.datetime.utcnow() + datetime.timedelta(days=28)).strftime("%a, %d %b %Y %H:%M:%S UTC"))
f.write('Architectures: %s\n' % arch)
if self._keyids:
f.write('Signed-By: %s\n' % ','.join(self._keyids))
if self._use_byhash:
f.write('Acquire-By-Hash: yes\n')
if self._release_description:
f.write('Description: %s\n' % self._release_description)
for hash in hashes:
self._do_hash(hash, indexfiles, f)
f.close()
if self._sign_releasefile(os.path.basename(tmpname), self._abspath(arch)):
os.rename(tmpname, targetname)
self._logger.info("Release generation complete")
def _in_archdir(self, *args):
return (lambda x, self=self: self._abspath(x))(*args)
def _get_dnotify_dirs(self):
return list(map(lambda x, self=self: self._abspath(x), self._arches + ['source']))
def _get_all_indexfiles(self):
return [os.path.join(arch, 'Packages') for arch in self._arches] + ['source/Sources']
def _release_needed(self, targetname, indexfiles):
if os.access(targetname, os.R_OK):
release_mtime = os.stat(targetname)[stat.ST_MTIME]
for file in indexfiles:
if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime:
return True
else:
return True
return False
class FlatArchiveDirIndexer(ArchiveDirIndexer):
def __init__(self, *args, **kwargs):
ArchiveDirIndexer.__init__(*[self] + list(args), **kwargs)
def _index_impl(self, arches, force=False):
pkgsfile = self._abspath('Packages')
dirmtime = os.stat(self._relpath())[stat.ST_MTIME]
if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
self._logger.info('Generating Packages file...')
self._make_packagesfile(self._relpath())
self._logger.info('Packages generation complete')
else:
self._logger.info('Skipping generation of Packages file')
pkgsfile = self._abspath('Sources')
if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
self._logger.info('Generating Sources file...')
self._make_sourcesfile(self._relpath())
self._logger.info('Sources generation complete')
else:
self._logger.info('Skipping generation of Sources file')
for arch in self._arches:
if arch in ("all", "source"):
continue
pkgsfile = self._abspath('Contents-%s' % arch)
if force or not os.access(pkgsfile, os.R_OK) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
self._logger.info('Generating Contents file...')
self._make_contentsfile(self._relpath(), arch)
self._logger.info('Contents generation complete')
else:
self._logger.info('Skipping generation of Contents file')
def _gen_release_impl(self, arches, force=False):
targetname = self._abspath('Release')
if not self._generate_release:
if os.access(targetname, os.R_OK):
self._logger.info("Release generation disabled, removing existing Release file")
try:
os.unlink(targetname)
except OSError as e:
pass
return
tmpname = targetname + tmp_new_suffix
uncompr_indexfiles = self._get_all_indexfiles()
indexfiles = []
comprexts = ['.xz']
for index in uncompr_indexfiles:
indexfiles.append(index)
for ext in comprexts:
indexfiles.append(index + ext)
if not self._release_needed(targetname, indexfiles):
self._logger.info("Skipping Release generation")
return
self._logger.info("Generating Release...")
if no_act:
self._logger.info("Release generation complete")
return
f = open(tmpname, 'w')
f.write('Origin: %s\n' % self._release_origin)
f.write('Label: %s\n' % self._release_label)
suite = self._release_suite
if not suite:
suite = self._name
f.write('Suite: %s\n' % suite)
codename = self._release_codename
if not codename:
codename = suite
f.write('Codename: %s\n' % codename)
if self._experimental_release:
f.write('NotAutomatic: yes\n')
elif self._backport_release:
f.write('NotAutomatic: yes\n')
f.write('ButAutomaticUpgrades: yes\n')
f.write('Date: %s\n' % time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()))
if self._expire_release_files or self._keyids:
f.write('Valid-Until: %s\n' % (datetime.datetime.utcnow() + datetime.timedelta(days=28)).strftime("%a, %d %b %Y %H:%M:%S UTC"))
f.write('Architectures: %s\n' % ' '.join(self._arches))
if self._keyids:
f.write('Signed-By: %s\n' % ','.join(self._keyids))
if self._use_byhash:
f.write('Acquire-By-Hash: yes\n')
if self._release_description:
f.write('Description: %s\n' % self._release_description)
for hash in hashes:
self._do_hash(hash, indexfiles, f)
f.close()
if self._sign_releasefile(tmpname, self._abspath()):
os.rename(tmpname, targetname)
self._logger.info("Release generation complete")
def _in_archdir(self, *args):
return (lambda x, self=self: self._abspath(x))(*args[1:])
def _get_dnotify_dirs(self):
return [self._dir]
def _get_all_indexfiles(self):
allindexes = []
for arch in self._arches:
if arch in ("all", "source"):
continue
allindexes.append("Contents-%s" % arch)
return ['Packages', 'Sources'] + allindexes
def _release_needed(self, targetname, indexfiles):
if os.access(targetname, os.R_OK):
release_mtime = os.stat(targetname)[stat.ST_MTIME]
for file in indexfiles:
if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime:
return True
else:
return True
return False
if os.access(lockfilename, os.R_OK):
logger.critical('lockfile "%s" exists (pid %s): is another mini-dinstall running?' % (lockfilename, open(lockfilename).read(10)))
logging.shutdown()
sys.exit(1)
logger.debug('Creating lock file: %s' % lockfilename)
if not no_act:
lockfile = open(lockfilename, 'w')
lockfile.close()
if not (batch_mode or foreground_mode):
# daemonize
logger.debug("Daemonizing...")
if os.fork() == 0:
os.setsid()
if os.fork() != 0:
sys.exit(0)
else:
sys.exit(0)
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
os.close(0)
os.close(1)
os.close(2)
# unix file descriptor allocation ensures that the following are fd 0,1,2
sys.stdin = open("/dev/null")
sys.stdout = open("/dev/null")
sys.stderr = open("/dev/null")
logger.debug("Finished daemonizing (pid %d)" % os.getpid())
if foreground_mode:
logger.debug("Running in foreground...")
lockfile = open(lockfilename, 'w')
lockfile.write(str(os.getpid()))
lockfile.close()
if not (debug_mode or batch_mode):
# Don't log to stderr past this point
logger.removeHandler(stderr_handler)
archivemap = {}
# Instantiate archive classes for installing files
for (dist, value) in list(distributions.items()):
if value['archive_style'] == 'simple-subdir':
archdir = SimpleSubdirArchiveDir
else:
archdir = FlatArchiveDir
archivemap[dist] = [archdir(dist, logger, value, batch_mode=batch_mode, keyrings=default_keyrings, extra_keyrings=default_extra_keyrings, verify_sigs=default_verify_sigs), None]
# Create archive indexing threads, but don't start them yet
for (dist, value) in list(distributions.items()):
targetdir = os.path.join(toplevel_directory, dist)
logger.info('Initializing archive indexer %s' % dist)
if value['archive_style'] == 'simple-subdir':
archdiridx = SimpleSubdirArchiveDirIndexer
else:
archdiridx = FlatArchiveDirIndexer
archivemap[dist][1] = archdiridx(targetdir, logger, value, use_dnotify=use_dnotify, batch_mode=batch_mode)
# Now: kick off the incoming processor
logger.info('Initializing incoming processor')
incoming = IncomingDir(incoming_subdir, archivemap, logger, trigger_reindex=trigger_reindex, poll_time=default_poll_time, max_retry_time=default_max_retry_time, batch_mode=batch_mode)
logger.debug('Starting incoming processor')
incoming.start()
if batch_mode:
logger.debug('Waiting for incoming processor to finish')
incoming.wait()
# Once we've installed everything, start the indexing threads
for dist in list(distributions.keys()):
archive = archivemap[dist][1]
logger.debug('Starting archive %s' % archive.name)
archive.start()
# Wait for all the indexing threads to finish; none of these ever
# return if we're in daemon mode
if batch_mode:
for dist in list(distributions.keys()):
archive = archivemap[dist][1]
logger.debug('Waiting for archive %s to finish' % archive.name)
archive.wait()
else:
logger.debug("Waiting for die event")
die_event.wait()
logger.info('Die event caught; waiting for incoming processor to finish')
incoming.wait()
for dist in list(distributions.keys()):
archive = archivemap[dist][1]
logger.info('Die event caught; waiting for archive %s to finish' % archive.name)
archive.wait()
logger.debug('Removing lock file: %s' % lockfilename)
os.unlink(lockfilename)
logger.info("main thread exiting...")
sys.exit(0)
# vim:ts=4:sw=4:et: