#!/usr/bin/python2.7 # vim: set fileencoding=utf-8 filetype=python foldmethod=expr expandtab shiftwidth=4 tabstop=4 : # Copyright 2019-2022 Univention GmbH # # https://www.univention.de/ # # All rights reserved. # # The source code of this program is made available # under the terms of the GNU Affero General Public License version 3 # (GNU AGPL V3) as published by the Free Software Foundation. # # Binary versions of this program provided by Univention to you as # well as other copyrighted, protected or trademarked materials like # Logos, graphics, fonts, specific documentations and configurations, # cryptographic keys etc. are subject to a license agreement between # you and Univention and not subject to the GNU AGPL V3. # # In the case you use this program under the terms of the GNU AGPL V3, # the program is provided in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public # License with the Debian GNU/Linux or Univention distribution in file # /usr/share/common-licenses/AGPL-3; if not, see # . """Univention Directory Notifier Transaction log admin commmand""" from __future__ import print_function from os import SEEK_SET, fstat, rename, unlink, environ from os.path import exists from sys import stderr from argparse import ArgumentParser, ArgumentTypeError from collections import namedtuple from logging import basicConfig, getLogger, CRITICAL, DEBUG from time import strftime, time from ctypes import c_bool, c_ulong, sizeof, Structure from errno import ENOENT from contextlib import contextmanager from itertools import chain from subprocess import call import ldap from ldap.dn import is_dn, str2dn from ldap.ldapobject import ReconnectLDAPObject from ldap.controls import SimplePagedResultsControl from ldap.modlist import addModlist from univention.config_registry import ConfigRegistry try: from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Type, NamedTuple # noqa F401 from types import TracebackType # noqa F401 from argparse import Namespace # noqa F401 Transaction = NamedTuple("Transaction", [("tid", int), ("dn", str), ("command", str)]) except ImportError: Transaction = namedtuple("Transaction", ["tid", "dn", "command"]) # type: ignore LOG_FORMAT = "%(asctime)-15s:%(levelname)s:%(message)s" BSIZE = 4096 HELP_RESET = 'See for more details.' HELP_RERUN = 'You can re-run this tool with the option "--fix" in order to try to fix this issue.' HELP_FIX = 'See for more details. ' SLD = 'slapd.service' UDN = 'univention-directory-notifier.service' UDL = 'univention-directory-listener.service' SERVICES = { 'domaincontroller_master': (SLD, UDN, UDL), 'domaincontroller_backup': (UDL, SLD, UDN), } # type: Dict[str, Tuple[str, ...]] class IndexHeader(Structure): """ Header for index file. .. warning:: The header is architecture dependant due to the use of `c_ulong`. """ MAGIC = 0x3395e0d4L _fields_ = [("magic", c_ulong)] _pack_ = 1 class IndexEntry(Structure): """ Entry of index file. .. warning:: The header is architecture dependant due to the use of `c_ulong`. .. warning:: The source is compiled with `-D_FILE_OFFSET_BITS=64` which makes `off_t` 64 bit on i386, too, but `ulong` remains 32 bit. """ _fields_ = [("valid", c_bool), ("offset", c_ulong)] _pack_ = 1 class Index(object): """ Index to efficiently lookup transactions in the translog file. :file:`/var/lib/univention-ldap/notify/transaction.index` """ def __init__(self, filename): # type: (str) -> None self.filename = filename + ".index" self.log = getLogger(__name__).getChild("Index") self.index = None # type: Optional[BinaryIO] self.size = 0 self.count = 0 def __enter__(self): # type: () -> Index try: self.index = index = open(self.filename, 'rb') data = index.read(sizeof(IndexHeader)) assert data, "Empty index" header = IndexHeader.from_buffer_copy(data) assert header.magic == header.MAGIC, header.magic stat = fstat(index.fileno()) self.size = size = stat.st_size except EnvironmentError as ex: self.log.warning("Failed to open %s: %s", self.filename, ex) if ex.errno != ENOENT: raise self.log.info("Creating empty %s", self.filename) self.index = index = open(self.filename, 'wb+') header = IndexHeader(IndexHeader.MAGIC) data = buffer(header)[:] # type: ignore self.index.write(data) self.size = size = len(data) count, reminder = divmod(size - sizeof(IndexHeader), sizeof(IndexEntry)) self.log.info("Index of size %d contains %d entries", size, count) self.count = count - 1 if count else 0 # transaction 0 is never used assert reminder == 0, reminder return self def __exit__(self, exc_type, exc_value, traceback): # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None assert self.index self.index.close() def __getitem__(self, tid): # type: (int) -> IndexEntry """ Return index entry for given transaction. :param tid: Transaction id. :returns: The index entry. :raises IndexEntry: for transactions before 1 or after the current count. """ if tid < 1 or tid > self.count: raise IndexError(tid) assert self.index self.seek(tid) data = self.index.read(sizeof(IndexEntry)) assert data, "Invalid index entry" entry = IndexEntry.from_buffer_copy(data) self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset) return entry def __setitem__(self, tid, offset): # type: (int, int) -> None """ Set index entry for given transaction. :param tid: Transaction id. :param offset: File offset. :raises IndexEntry: for transactions before 1. """ if tid < 1: raise IndexError(tid) if tid > self.count: self.count = tid assert self.index self.seek(tid) entry = IndexEntry(True, offset) data = buffer(entry)[:] # type: ignore self.index.write(data) self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset) def seek(self, tid): # type: (int) -> None """ Seek to given transaction id. :param tid: Transaction id. """ assert 1 <= tid <= self.count assert self.index pos = sizeof(IndexHeader) + tid * sizeof(IndexEntry) self.log.debug("Looking up transaction %d at position %d in index", tid, pos) self.index.seek(pos, SEEK_SET) def remove(self): # type: () -> None """ Remove `.index` file. """ self.log.info('Removing %s ...', self.filename) try: unlink(self.filename) except EnvironmentError as ex: self.log.debug('unlink %s: %s', self.filename, ex) if ex.errno != ENOENT: raise class Translog(object): """ Transactions log file. :file:`/var/lib/univention-ldap/notify/transaction` """ def __init__(self, filename, index=None): # type: (str, Optional[Index]) -> None self.filename = filename self.index = index or Index(filename).__enter__() # type: Index self.log = getLogger(__name__).getChild("Log") self.translog = None # type: Optional[BinaryIO] self.size = 0 self.first = 0 self.last = 0 @property def count(self): # type: () -> int """ Return count of transactions. """ if self.last == 0: return 0 return self.last - self.first + 1 @staticmethod def parse_line(line): # type: (str) -> Transaction """ Parse line from transaction line. :param line: One transaction line :returns: 3-tuples (transaction_number, distinguished_name, command) """ line = line.strip() tid, rest = line.split(' ', 1) dn, command = rest.rsplit(' ', 1) return Transaction(int(tid), dn, command) @staticmethod def format_line(rec): # type: (Transaction) -> str return '%d %s %s\n' % rec def __enter__(self): # type: () -> Translog self.translog = translog = open(self.filename, 'r') stat = fstat(translog.fileno()) self.size = size = stat.st_size if size: line = translog.readline() rec = self.parse_line(line) assert 1 <= rec.tid, rec.tid self.first = rec.tid last = self.index.count if last: self.log.debug("Seeking to last transaction %d...", last) self.seek(last) else: offset = 0 if size < BSIZE else size - BSIZE self.read(offset) for line in self.translog: self.log.debug("Read line %r", line) rec = self.parse_line(line) assert last <= rec.tid, (last, rec.tid) if last < rec.tid: self.log.warn("Index=%d < translog=%d entries", last, rec.tid) self.last = rec.tid self.translog.seek(0, SEEK_SET) return self def __exit__(self, exc_type, exc_value, traceback): # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None assert self.translog self.translog.close() def __getitem__(self, tid): # type: (int) -> Transaction """ Return given transaction. :param tid: Transaction id. :returns: The transaction. :raises IndexEntry: for transactions before 1 or after the current count. """ assert self.index if tid < 1 or tid > self.index.count: raise IndexError(tid) assert self.translog self.seek(tid) line = self.translog.readline() self.log.debug("Read line %r", line) rec = self.parse_line(line) assert tid == rec.tid, rec.tid return rec def __iter__(self): # type: () -> Iterator[Transaction] """ Iterate over all transactions. """ assert self.translog for line in self.translog: self.log.debug("Read line %r", line) rec = self.parse_line(line) yield rec def seek(self, tid): # type: (int) -> int """ Seek to given transaction id. :param tid: Transaction id. :returns: Offset. """ assert self.index assert self.first <= tid <= self.index.count rec = self.index[tid] if not rec.valid: self.log.warn("Transaction %d is invalid", tid) raise IndexError(tid) pos = rec.offset self.log.debug("Seeking to transaction %d at position %d", tid, pos) assert 0 <= pos < self.size, pos assert self.translog self.translog.seek(pos, SEEK_SET) return pos def read(self, offset): # type: (int) -> Transaction """ Read next transaction after given offset. :param offset: absolute file offset. :returns: The transaction. :raises EOFError: if reading past the end. """ assert 0 <= offset <= self.size, offset if offset >= self.size: raise EOFError() assert self.translog pos = max(0, offset - BSIZE) self.translog.seek(pos, SEEK_SET) data = self.translog.read(offset - pos) self.log.debug("Read from %d: %r", pos, data) before = data.rsplit('\n', 1)[-1] after = self.translog.readline() self.log.debug("Read line %r %r", before, after) line = before + after rec = self.parse_line(line) return rec class Abort(Exception): """ Fatal abort. """ class DryRun(Exception): """ Abort dry-run mode. """ def main(): # type: () -> int """ Work with transaction log in LDAP server. """ opt = parse_args() basicConfig(stream=stderr, level=max(DEBUG, CRITICAL - DEBUG * opt.verbose), format=LOG_FORMAT) try: return opt.func(opt) or 0 except Abort: return 1 @contextmanager def ldapi(opt): # type: (Namespace) -> Iterator[ReconnectLDAPObject] """ Return local LDAP connection. :param opt: Command line options. :returns: A initialized LDAP connection. """ log = getLogger(__name__).getChild("ldap") log.debug("ldap_set_option(PROTOCOL_VERSION)") ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) log.debug("ldap_initialize(%s)", opt.translog_ldap) ld = ReconnectLDAPObject( opt.translog_ldap, trace_level=max(0, opt.verbose - 4), retry_max=3, retry_delay=10.0, ) log.debug("ldap_bind()") ld.sasl_external_bind_s() yield ld log.debug("ldap_unbind()") ld.unbind_ext_s() def import_all(opt): # type: (Namespace) -> None """ Load transaction from file into LDAP server. :param opt: Command line options. """ log = getLogger(__name__).getChild("import") log.info("Reading transactions from file '%s'...", opt.translog_file) with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld: if not translog.size: log.debug("File is empty; no transactions to process.") return c_tid = max(1, translog.last - opt.count + 1) log.debug("count.tid=%d count=%d", c_tid, opt.count) p_offset = int(translog.size * (1.0 - opt.percent)) try: p_trans = translog.read(p_offset) p_tid = p_trans.tid except EOFError: p_tid = translog.last log.debug("percent.tid=%d off=%d percent=%f", p_tid, p_offset, opt.percent) s_offset = max(0, translog.size - opt.size) try: s_trans = translog.read(s_offset) s_tid = s_trans.tid except EOFError: s_tid = translog.last log.debug("size.tid=%d off=%d size=%d", s_tid, s_offset, opt.size) start = max(1, min(c_tid, p_tid, s_tid), translog.first, opt.min) assert translog.first <= start <= translog.last, start end = min(opt.max or translog.last, translog.last) if opt.index and index.count: log.info("Processing transactions %d .. %d", start, end) translog.seek(start) lead = [] # type: List[Transaction] else: offset = min(p_offset, s_offset) log.info("Processing transaction from offset %d .. %d", offset, end) rec = translog.read(offset) lead = [rec] t_begin = t_last = time() count = end - start for rec in chain(lead, translog): if rec.tid < start: continue if rec.tid > end: break add_transaction(opt, ld, rec) if stderr.isatty(): t_now = time() if t_now - t_last >= 5.0: p_done = float(rec.tid - start) / count t_used = t_now - t_begin stderr.write('\rprocessed {:d} of {:d} [{:.1f}%] in {:.1f}s, {:d} remaining in {:.1f}s \r'.format( rec.tid - start, count, p_done * 100.0, t_used, end - rec.tid, t_used / p_done - t_used, )) t_last = t_now def load_single(opt): # type: (Namespace) -> None """ Load given transactions into LDAP server. :param opt: Command line options. """ with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld: for tid in opt.trans: try: rec = translog[tid] except IndexError: continue add_transaction(opt, ld, rec) def show_stat(opt): # type: (Namespace) -> None """ Show statistics :param opt: Command line options. """ with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: print("Index.file: %s" % index.filename) print("Index.size: %d" % index.size) print("Index.count: %d" % index.count) print("Index.okay: %s" % ('yes' if index.count == translog.last else 'no',)) print("Translog.file: %s" % translog.filename) print("Translog.size: %d" % translog.size) print("Translog.first: %d" % translog.first) print("Translog.last: %d" % translog.last) print("Translog.count: %d" % translog.count) first, last = translog.first, translog.last tid = last with ldapi(opt) as ld: while tid >= first: dn = "reqSession={},{}".format(tid, opt.translog_base) try: ld.search_ext_s(dn, ldap.SCOPE_BASE) break except ldap.NO_SUCH_OBJECT: tid -= 1 print("Ldap.last: %d" % tid) print("Ldap.okay: %s" % ('yes' if last == tid else 'no',)) def dump_index(opt): # type: (Namespace) -> None """ Dump transaction index :param opt: Command line options. """ log = getLogger(__name__).getChild("INDEX") with Index(opt.translog_file) as index: begin = opt.min end = min(opt.max or index.count, index.count) log.debug('Dumping index from [%d, %d] ...', begin, end) for tid in xrange(begin, end + 1): rec = index[tid] print('%8d[%c]: %d' % (tid, 'x' if rec.valid else ' ', rec.offset)) def reindex(opt): # type: (Namespace) -> None """ Re-build transaction index :param opt: Command line options. """ log = getLogger(__name__).getChild("REINDEX") index = Index(opt.translog_file) log.warning("Ignore the following 2 WARNINGS about the missing and mismatching index") with stopped(opt, (UDN,)), rewrite(index.filename) as index.filename: with index, Translog(opt.translog_file, index) as translog: log.warning("Re-building index %s now...", index.filename) assert translog.translog is not None pos = 0 for lnr, line in enumerate(translog.translog, start=1): rec = translog.parse_line(line) log.info("%d: tid=%d @ %d", lnr, rec.tid, pos) index[rec.tid] = pos pos += len(line.encode()) def lookup(opt): # type: (Namespace) -> None """ Lookup transactions. :param opt: Command line options. """ with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: for tid in opt.trans: print(translog[tid]) def lookup_ldap(opt): # type: (Namespace) -> None """ Check transactions. :param opt: Command line options. """ log = getLogger(__name__).getChild("LDAP") with ldapi(opt) as ld: for tid in opt.trans: dn = "reqSession={},{}".format(tid, opt.translog_base) try: result = ld.search_ext_s(dn, ldap.SCOPE_BASE) except ldap.NO_SUCH_OBJECT as ex: if not opt.lenient: log.critical("ldap_search(%s): %s", dn, ex.args[0]["desc"]) raise Abort() ((dn, attrs),) = result print('tid={0[reqSession][0]} dn={0[reqDN][0]} command={0[reqType][0]}'.format(attrs)) def add_transaction(opt, ld, rec): # type: (Namespace, ReconnectLDAPObject, Transaction) -> None """ Load single transaction into LDAP server. :param opt: Command line options. :param ld: LDAP server connection. :param rec: Transaction to add. :raises Abort: on fatal errors. """ log = getLogger(__name__).getChild("LDAP") dn = "reqSession={.tid},{}".format(rec, opt.translog_base) modlist = addModlist({ "objectClass": ["auditObject"], "reqStart": [opt.datetime], "reqType": ["{.command}".format(rec)], "reqSession": ["{.tid}".format(rec)], "reqDN": ["{.dn}".format(rec)], }) try: log.debug("ldap_add(%s)", dn) if not opt.dry_run: ld.add_ext_s(dn, modlist) except (ldap.ALREADY_EXISTS, ldap.INVALID_SYNTAX) as ex: if not opt.lenient: log.critical("ldap_add(%s): %s", rec, ex.args[0]["desc"]) raise Abort() log.error("ldap_add(%s): %s", rec, ex.args[0]["desc"]) except ldap.LDAPError as ex: log.critical("ldap_add(%s): %s", rec, ex) raise Abort() def check(opt): # type: (Namespace) -> None """ Check transaction files for consistency. :param opt: Command line options. """ if opt.role not in SERVICES: print('This command is only for UCS Master and Backups.') exit(0) with stopped(opt, SERVICES[opt.role]): translog = CheckTranslog(opt, opt.translog_file) translog.run() listener = CheckListener(opt, opt.listener_file) if exists(opt.listener_private_file): listener_priv = CheckListener(opt, opt.listener_private_file) listener_priv.first_tid = translog.last_tid listener_priv.first_line = translog.last_line listener_priv.run() listener.first_tid = listener_priv.last_tid listener.first_line = listener_priv.last_line else: listener.first_tid = translog.last_tid listener.first_line = translog.last_line listener.run() if opt.role == 'domaincontroller_master': check_last(opt, opt.last_file, listener.last_tid) # TODO: check cn=translog (Bug #49225) @contextmanager def stopped(opt, services): # type: (Namespace, Tuple[str, ...]) -> Iterator """ Stop services while in context. :param opt: Command line options. :param services: Sequence of service names. """ log = getLogger(__name__).getChild("SERVICE") if opt.skip_services or not opt.fix: yield return log.info("Stopping %s", services) cmd = ('systemctl', 'stop') + services call(cmd) try: yield except Exception: raise else: log.info("Starting %s", services) cmd = ('systemctl', 'start') + services call(cmd) @contextmanager def rewrite(filename): # type: (str) -> Iterator[str] """ Return name of temporary file to replace given file on success. :param filename: Name of file to replace on success. :returns: Name of temporary file. """ log = getLogger(__name__).getChild("rewrite") cur = filename new = filename + '.new' bak = filename + '.bak' try: yield new except Exception as ex: log.debug('abort: %s', ex) if exists(new): try: unlink(new) except EnvironmentError as ex: log.debug('unlink %s: %s', new, ex) if ex.errno != ENOENT: raise raise else: log.debug('renaming %s %s', cur, bak) try: rename(cur, bak) except EnvironmentError as ex: log.debug('rename %s %s: %s', cur, bak) if ex.errno != ENOENT: raise log.debug('renaming %s %s', new, cur) rename(new, cur) class CheckGeneric(object): """ Check transaction file for consistency. """ def __init__(self, opt, filename): # type: (Namespace, str) -> None """ Check transaction file for consistency. :param opt: Command line options. :param filename: The name of the file containing transaction lines. """ self.opt = opt self.filename = filename self.log = getLogger(__name__).getChild("CHECK").getChild(self.__class__.__name__) self.first_tid = 0 self.first_line = '' self.reset() def reset(self): # type: () -> None """ Reset internal state. """ self.needs_fixing = False self.needs_sort = False self.needs_fill = False self.needs_renum = False self.needs_syntax = False self.lnr = 0 self.line = '' self.last_line = self.first_line self.last_tid = self.first_tid def error(self, string, *args, **kwargs): # type: (str, *Any, **bool) -> None """ Flag error in file. :param string: A descriptive format string, which is formatted using any additional positional arguments. :param bool fix: The error must be fixed. :param bool sort: The error can be fixed by sorting the transactions. :param bool fill: The error can be fixed by filling the hole with dummy entries. :param bool renum: The error can be fixed by re-numbering all transactions. :param bool syntax: The line contains syntax error needing manual fixing. """ self.log.error('%s:%d:%r: ' + string, self.filename, self.lnr, self.line, *args) self.needs_fixing |= kwargs.get('fix', True) self.needs_sort |= kwargs.get('sort', False) self.needs_fill |= kwargs.get('fill', False) self.needs_renum |= kwargs.get('renum', False) self.needs_syntax |= kwargs.get('syntax', False) def run(self, filename=''): # type: (str) -> None """ Run check on trasnaction file. :param filename: The name of the transaction file. Defaults to :py:attribute:`filename`. """ self.reset() with open(filename or self.filename, 'r') as stream: self.log.info("Reading %s ...", filename or self.filename) for lnr, line, rec in self.loop(stream): self.check_transaction(rec) self.log.info("Reading %s done", filename or self.filename) if self.needs_fixing and filename == '': print() print('%s needs fixing:' % (self.filename,)) if self.opt.role != 'domaincontroller_master': print('This UCS system must be re-joined!') print() print(HELP_FIX) exit(1) self.fixit() def loop(self, stream): # type: (BinaryIO) -> Iterator[Tuple[int, str, Transaction]] """ Loop over all lines in the stream. :param stream: The input stream. :returns: yields 3-tuples (line-number, original-line-content, parsed-transaction-record). """ for self.lnr, line in enumerate(stream, start=1): self.line = line try: if line.endswith('\n'): line = line[:-1] else: self.error("Missing newline line", fix=False) stripped = line.strip() if line != stripped: self.error("Extra whitespace", fix=False) line = stripped stripped = line.strip('\x00') if line != stripped: self.error("Binary zeros", fix=False) line = stripped try: rec = Translog.parse_line(line) except ValueError: # Ok let's try to identify the problem try: tid_, rest = line.split(' ', 1) except ValueError: self.error("Invalid line", syntax=True) continue try: tid = int(tid_) except ValueError: tid = -1 try: dn, command = rest.split(' ', 1) except ValueError: self.error("Invalid line", syntax=True) dn, command = rest, '' rec = Transaction(tid, dn, command) if not 0 < rec.tid: self.error("Invalid transaction id", renum=True) continue if rec.command not in {'r', 'd', 'a', 'm'}: # mod_Rdn Delete Add Modify self.error("Invalid command") # if self.opt.fix: # rec = rec._replace(command='m') if not is_dn(rec.dn): self.error("Invalid dn") # if self.opt.fix: # rec = rec._replace(dn=self.opt.base) elif not rec.dn.endswith(self.opt.base): self.error("Foreign dn", fix=False) yield (self.lnr, line, rec) self.last_tid = rec.tid finally: self.last_line = line def check_transaction(self, rec): # type: (Transaction) -> None """ Check transaction for consistency. :param rec: The transaction record to check. """ raise NotImplementedError() def fixit(self): # type: () -> None """ Fix transactions. """ raise NotImplementedError() def fix_syntax(self): # type: () -> None # TODO: Loop over lines and # - strip binary zeros # - add missing command in 3rd column # - optional: fake invalid DNs in 2nd column # - optional: fake transaction number # see commented out code in :py:meth:`loop`. """ Handle syntax errors. TODO """ self.log.info('Syntax ...') print('- contains unparseable lines!') print() print(HELP_RESET) exit(1) def fix_sort(self): # type: () -> None """ Handle transactions not being sorted strong monotonically increasing. """ self.log.info('Sorting ...') print('- the transactions are not sorted uniquely') with rewrite(self.filename) as tmp: cmd = ( 'sort', '--ignore-leading-blanks', '--ignore-nonprinting', '--key', '1n', # transaction ID '--key', '2', # DN '--key', '3r', # command 'r', 'm', 'd', 'a' '--unique', '--output', tmp, self.filename, ) env = dict(environ) env['LC_ALL'] = 'C.UTF-8' self.log.debug('Running %r ...', cmd) if call(cmd, env=env): print('- failed to sort file!') print('Check available space of filesystem?') exit(2) self.run(filename=tmp) if self.needs_sort: print('- still contains duplicate transactions after unique sorting!') print('UCS Master must be reset and all other UCS systems must be re-joined!') print() print(HELP_RESET) exit(1) if not self.opt.fix: print() print(HELP_RERUN) print(HELP_FIX) exit(1) self.remove_index() self.log.info('Sorting done') def fix_fill(self): # type: () -> None """ Handle transactions not being consecutive. """ self.log.info('Filling holes ...') print('- missing transactions in sequence') if not self.opt.fix: print() print(HELP_RERUN) print(HELP_FIX) exit(1) with rewrite(self.filename) as tmp: with open(self.filename, 'r') as old, open(tmp, 'w') as new: next_tid = 0 for line in old: try: rec = Translog.parse_line(line) assert next_tid == 0 or next_tid <= rec.tid, (next_tid, rec.tid) while next_tid != 0 and next_tid < rec.tid: self.log.info('Filling %d ...', next_tid) fill_rec = Transaction(tid=next_tid, dn=self.opt.base, command='m') fill_line = Translog.format_line(fill_rec) new.write(fill_line) next_tid += 1 next_tid = rec.tid + 1 except ValueError: pass new.write(line) self.run(filename=tmp) assert not self.needs_fill self.remove_index() self.log.info('Filling holes done') def fix_renumber(self): # type: () -> None """ Handle transactions being out-of-order. """ self.log.info('Renumbering ...') print('- pending transactions are not consecutive') if not self.opt.fix: print() print(HELP_RERUN) print(HELP_FIX) exit(1) with rewrite(self.filename) as tmp: with open(self.filename, 'r') as old, open(tmp, 'w') as new: for self.last_tid, line in enumerate(old, start=self.first_tid + 1): line = line.rstrip('\n') self.log.debug('Renum %r to %d ...', line, self.last_tid) try: rec = Translog.parse_line(line) except ValueError: try: tid_, rest = line.split(' ', 1) except ValueError: raise try: tid = int(tid_) except ValueError: tid = -1 try: dn, command = rest.split(' ', 1) except ValueError: raise rec = Transaction(tid, dn, command) rec = rec._replace(tid=self.last_tid) line = Translog.format_line(rec) new.write(line) self.run(filename=tmp) assert not self.needs_renum self.log.info('Renumbering done') def remove_index(self): # type: () -> None """ Remove any associated index file. """ raise NotImplementedError() class CheckTranslog(CheckGeneric): """ Check transactions log file for consistency. :file:`/var/lib/univention-ldap/notify/transaction` """ def check_transaction(self, rec): # type: (Transaction) -> None """ Check committed transaction. :param rec: The transaction record to check. """ if self.lnr == 1: self.log.info("%s:%d: Starts with: %r", self.filename, self.lnr, self.line) elif rec.tid <= self.last_tid: self.error("Repeated line after %r", self.last_line, sort=True) elif self.last_tid + 1 != rec.tid: self.error("Hole after %r", self.last_line, fill=True) def fixit(self): # type: () -> None """ Fix transactions. """ if self.needs_syntax: self.fix_syntax() if self.needs_sort: self.fix_sort() if self.needs_fill: self.fix_fill() def remove_index(self): # type: () -> None """ Remove `.index` file. """ Index(self.filename).remove() class CheckListener(CheckGeneric): """ Check pending transactions log file for consistency. :file:`/var/lib/univention-ldap/listener/listener` """ def check_transaction(self, rec): # type: (Transaction) -> None """ Check pending transaction. :param rec: Transaction record. """ if self.last_tid + 1 == rec.tid: pass elif rec.tid <= self.last_tid: self.error("Repeated line %r", self.last_line, renum=True) elif self.lnr == 1: self.error("Not continuous with %s: %r", self.opt.translog_file, self.last_line, renum=True) else: self.error("Hole after %r", self.last_line, renum=True) def fixit(self): # type: () -> None """ Fix transactions. """ if self.needs_syntax: self.fix_syntax() if self.needs_renum: self.fix_renumber() def remove_index(self): # type: () -> None """ No associated `.index` file to remove. """ pass def check_last(opt, filename, last_tid): # type: (Namespace, str, int) -> None """ Check last used transactions id. :file:`/var/lib/univention-ldap/last_id` :param opt: Command line options. :param filename: File name. :param last_tid: Expected transaction id. """ log = getLogger(__name__).getChild("CHECK") try: with open(filename, 'r') as last: line = last.read() if not line: raise ValueError('should be %d, but is empty' % (last_tid,)) tid = int(line) if tid != last_tid: raise ValueError('should be %d, but is %d' % (last_tid, tid)) except (EnvironmentError, ValueError) as ex: log.error("%s: Invalid last id: %s", filename, ex) if opt.fix: with open(filename, 'w') as last: last.write('%d' % (last_tid,)) log.debug("%s: Updated to: %d", filename, last_tid) else: print() print('%s needs manual fixing!' % (filename,)) print() print(HELP_RERUN) print(HELP_FIX) exit(1) def prune(opt): # type: (Namespace) -> None """ Prune old transactions. :param opt: Command line options. """ if opt.role not in SERVICES: print('This command is only for UCS Master and Backups.') exit(0) opt.skip_services = opt.dry_run with stopped(opt, (UDN,)): try: prune_file(opt) except DryRun: pass prune_ldap(opt) def prune_file(opt): """ Prune old transactions from file. :param opt: Command line options. """ log = getLogger(__name__).getChild("prune").getChild("file") with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: if opt.trans < 0: log.info('Keeping last %d transactions before %d', -opt.trans, translog.last) opt.trans = translog.last + opt.trans + 1 log.info('Purging/keeping %d..%d..%d', translog.first, opt.trans, translog.last) if opt.trans <= translog.first: log.fatal('Already purged.') raise Abort() if opt.trans >= translog.last: log.fatal('Nothing to purge.') raise Abort() assert translog.first < opt.trans < translog.last with rewrite(opt.translog_file) as tmp, open(tmp, 'w') as new: for rec in translog: if rec.tid < opt.trans: log.debug('Dropping %d', rec.tid) continue line = Translog.format_line(rec) new.write(line) if opt.dry_run: raise DryRun() index.remove() def prune_ldap(opt): """ Prune old transactions from LDAP. :param opt: Command line options. """ log = getLogger(__name__).getChild("prune").getChild("ldap") base = opt.translog_base scope = ldap.SCOPE_ONELEVEL filterstr = '(reqSession=*)' attrlist = ['1.1'] # LDAP_NO_ATTRS page_control = SimplePagedResultsControl(True, size=1000, cookie='') with ldapi(opt) as ld: while True: response = ld.search_ext(base, scope, filterstr, attrlist, serverctrls=[page_control]) rtype, rdata, rmsgid, serverctrls = ld.result3(response) for control in serverctrls: if control.controlType == SimplePagedResultsControl.controlType: break else: log.fatal('Server ignores RFC 2696 control: %r', serverctrls) raise Abort() for (dn, attrs) in rdata: ava = str2dn(dn) for (attr, value, _flags) in ava[0]: if attr == 'reqSession': break else: continue tid = int(value) if tid >= opt.trans: log.debug('Keeping %s', dn) continue try: log.debug("ldap_delete(%s)", dn) if not opt.dry_run: ld.delete_ext_s(dn) log.info("Deleted %s", dn) except ldap.LDAPError as ex: log.error("ldap_delete(%s): %s", dn, ex) if not control.cookie: break page_control.cookie = control.cookie def parse_args(args=None): # type: (List[str]) -> Namespace """ Parse command line arguments. :param args: the list of arguments to process (default: `sys.argv[1:]`) :returns: a Namespace instance. """ ucr = ConfigRegistry() ucr.load() parser = ArgumentParser(description=__doc__) parser.add_argument("--translog-file", "-T", metavar="FILENAME", help="Transaction file [%(default)s]", default="/var/lib/univention-ldap/notify/transaction") parser.add_argument("--translog-ldap", "-H", metavar="URL", help="LDAP URL [%(default)s]", default="ldapi:///") parser.add_argument("--translog-base", "-B", metavar="DN", help="LDAP base for translog [%(default)s]", default="cn=translog") parser.add_argument("--lenient", "-l", action="store_true", help="Ignore existing entries") parser.add_argument("--verbose", "-v", action="count", help="Increase verbosity", default=2) parser.add_argument("--dry-run", "-n", action="store_true", help="Do not modify anything") parser.add_argument("--datetime", "-d", help="Overwrite time-stamp for import [%(default)s]", default=strftime("%Y%m%d%H%M%SZ")) subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands") parser_import = subparsers.add_parser("import", help="Import transaction file into LDAP") parser_import.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions") import_amount = parser_import.add_argument_group("minimum", description="Amount of transactions to import") import_amount.add_argument("--count", "-c", type=parse_int_pos, help="Minimum number of transaction [%(default)s]", default=100000) import_amount.add_argument("--percent", "-p", type=parse_percent, help="Minimum percentage of transaction file [%(default)s%%]", default="0") import_amount.add_argument("--size", "-s", type=parse_size, help="Minimum number of bytes from transaction file [%(default)s]", default="10M") import_limit = parser_import.add_argument_group("limit", description="Limit transactions to process") import_limit.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1) import_limit.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process") parser_import.set_defaults(func=import_all) parser_load = subparsers.add_parser("load", help="Load specified transactions into LDAP") parser_load.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to process") parser_load.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions") parser_load.set_defaults(func=load_single) parser_lookup = subparsers.add_parser("lookup", help="Lookup transaction in file") parser_lookup.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup") parser_lookup.set_defaults(func=lookup) parser_ldap = subparsers.add_parser("ldap", help="Lookup transaction in LDAP") parser_ldap.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup") parser_ldap.set_defaults(func=lookup_ldap) parser_stat = subparsers.add_parser("stat", help="Show statistics") parser_stat.set_defaults(func=show_stat) parser_index = subparsers.add_parser("index", help="Dump index file") parser_index.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1) parser_index.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process") parser_index.set_defaults(func=dump_index) parser_reindex = subparsers.add_parser("reindex", help="Re-build index file") parser_reindex.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services") parser_reindex.set_defaults(func=reindex, fix=True) parser_check = subparsers.add_parser("check", help="Check transaction files") parser_check.add_argument("--fix", "-f", action="store_true", help="Try to fix issues") parser_check.add_argument("--base", "-b", metavar="DN", help="LDAP base [%(default)s]", default=ucr['ldap/base']) parser_check.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role']) parser_check.add_argument("--listener-file", "-L", metavar="FILENAME", help="Listener file [%(default)s]", default="/var/lib/univention-ldap/listener/listener") parser_check.add_argument("--listener-private-file", "-P", metavar="FILENAME", help="Listener private file [%(default)s]", default="/var/lib/univention-ldap/listener/listener.priv") parser_check.add_argument("--last-file", "-I", metavar="FILENAME", help="Last_id file [%(default)s]", default="/var/lib/univention-ldap/last_id") parser_check.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services") parser_check.set_defaults(func=check) parser_prune = subparsers.add_parser("prune", help="Prune transaction files") parser_prune.add_argument("trans", metavar="TID", type=int, help="Oldest transaction number to keep (negative numbers: the number of transactions to keep)") parser_prune.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role']) parser_prune.set_defaults(func=prune, fix=True) opt = parser.parse_args(args) return opt def parse_int_pos(string): # type: (str) -> long """ Parse positive integer string. :param string: The command line string. :returns: The parsed integer. :raises ArgumentTypeError: if the string is valid. """ try: val = long(string) if 0 < val: return val except ValueError: pass raise ArgumentTypeError("Positive integer required") def parse_percent(string): # type: (str) -> float """ Parse percentage string. :param string: The command line string. :returns: The parsed percentage. :raises ArgumentTypeError: if the string is valid. """ try: val = float(string.rstrip('%')) if not 0.0 <= val <= 100.0: raise ValueError() except ValueError: raise ArgumentTypeError("Invalid percentage") return val / 100.0 def parse_size(string): # type: (str) -> long """ Parse size string. :param string: The command line string. :returns: The parsed size. :raises ArgumentTypeError: if the string is valid. """ suffix = string.lstrip(".0123456789") try: unit, = suffix.rstrip("iIbB").upper() or ' ' scale = 1L << (10 * " KMGTPE".index(unit)) except ValueError: raise ArgumentTypeError("Invalid unit") prefix = string[:-len(suffix)] if suffix else string try: value = float(prefix) if not 0.0 <= value: raise ValueError() except ValueError: raise ArgumentTypeError("Invalid value") return long(value * scale) if __name__ == "__main__": exit(main())