#!/usr/bin/python """ Check APT repository for consistent packages: - same version for all $ARCH - package is correct $ARCH directory - no duplicate version - apt/ and packages/ in sync """ import os import sys import logging from optparse import OptionParser OVERWRITE = { # source package equals binary package, has explicit version overwrite "binutils-mingw-w64_2.22-8.29.201403142056+2.2.201403142144_all": ("binutils-mingw-w64", "2.2.201403142144"), # binary package matches other source package "snappy_0.2-1.4.201403200850_amd64": ("snappy-player", "0.2-1.4.201403200850"), "snappy_0.2-1.4.201403200850_i386": ("snappy-player", "0.2-1.4.201403200850"), } class Check(object): BASE = "/var/univention/buildsystem2" RELEASE = "ucs_4.0-0" def __init__(self): self.apt_files = set() self.index_files = set() self.log = logging.getLogger() self.packages = {} self.errors = 0 self.warnings = 0 self.opt = None def parse_args(self): usage = "%prog [--fast]" description = sys.modules[__name__].__doc__ parser = OptionParser(usage=usage, description=description) parser.add_option("-f", "--fast", dest="fast", action="store_true", help="Skip index validation") parser.add_option("-r", "--release", dest="release", action="store", help="Release [%default]", default=self.RELEASE) self.opt, _args = parser.parse_args() def main(self): self.parse_args() self.find_bin(os.path.join(self.BASE, "apt", self.opt.release), self.apt_files) self.find_bin(os.path.join(self.BASE, "packages", self.opt.release), self.index_files) self.check() self.log.info("%d errors, %d warning", self.errors, self.warnings) def error(self, *args): self.errors += 1 self.log.error(*args) def warning(self, *args): self.warnings += 1 self.log.warning(*args) def find_bin(self, rootdir, collect): for dirpath, dirnames, filenames in os.walk(rootdir): self.log.debug("Processing directory %s...", dirpath) for filename in filenames: filepath = os.path.relpath(os.path.join(dirpath, filename), rootdir) root, ext = os.path.splitext(filename) if ext in (".sources", ".packages"): root, ext = os.path.splitext(root) if ext in (".deb", ".udeb"): pkg, ver, arch = root.split("_") elif ext in (".dsc",): pkg, ver = root.split("_") arch = "source" else: continue try: pkg, ver = OVERWRITE[root] except LookupError: pass collect.add((pkg, ver, arch)) if not dirpath.endswith(arch): self.error("Wrong directory: %s", filepath) versions = self.packages.setdefault(pkg, {}) archs = versions.setdefault(ver, {}) archs[arch] = filepath for dirname in dirnames: if dirname.startswith('dists'): dirnames.remove(dirname) def check(self): self.check_versions() self.check_sync() if not self.opt.fast: self.check_outdated() def check_versions(self): for pkg, versions in self.packages.iteritems(): if len(versions) > 1: self.error("Multiple versions of %s: %s", pkg, ' '.join(sorted(versions))) continue for version, archs in versions.iteritems(): a = archs.get("all") b = archs.get("i386") c = archs.get("amd64") if a and (b or c): self.error("Mixed architectures: %s %s %s", a, b, c) elif bool(b) != bool(c): if c and "amd64" in pkg: continue if b and ("i386" in pkg or "-486" in pkg or "-686" in pkg): continue self.warning("Missing architecture: %s %s", b, c) def check_sync(self): diff = self.apt_files.symmetric_difference(self.index_files) for pkg, ver, arch in diff: self.error("Info out-of-sync: %s %s %s", pkg, ver, arch) def check_outdated(self): apt_dir = os.path.join(self.BASE, "apt", self.opt.release) pkg_dir = os.path.join(self.BASE, "packages", self.opt.release) for dirpath, dirnames, filenames in os.walk(apt_dir): for filename in filenames: root, ext = os.path.splitext(filename) apt_file = os.path.join(dirpath, filename) filepath = os.path.relpath(apt_file, apt_dir) index_file = os.path.join(pkg_dir, filepath) if exit in (".deb", ".udeb"): index_file += ".packages" elif filename.endswith(".dsc"): index_file += ".sources" else: continue try: apt_stat = os.stat(apt_file) pkg_stat = os.stat(index_file) if apt_stat.st_mtime > pkg_stat.st_mtime: self.error("Outdated index file: %s", filepath) except EnvironmentError as ex: self.warning("Missing index file: %s", ex) if __name__ == '__main__': logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) Check().main()