Univention Bugzilla – Attachment 11049 Details for
Bug 55863
Notifier indexing slows rapidly down the last pending 999 transactions to index
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
univention-translog backport to Python 2.7 / UCS 4.4-x (
univention-translog-backport.py (text/x-python), 48.15 KB, created by
Mirac Erdemiroglu
on 2023-03-08 08:23:04 CET
(
hide
)
Description:
univention-translog backport to Python 2.7 / UCS 4.4-x (
Filename:
MIME Type:
Creator:
Mirac Erdemiroglu
Created:
2023-03-08 08:23:04 CET
Size:
48.15 KB
patch
obsolete
>#!/usr/bin/python2.7 ># vim: set fileencoding=utf-8 filetype=python foldmethod=expr expandtab shiftwidth=4 tabstop=4 : ># Copyright 2019-2022 Univention GmbH ># ># https://www.univention.de/ ># ># All rights reserved. ># ># The source code of this program is made available ># under the terms of the GNU Affero General Public License version 3 ># (GNU AGPL V3) as published by the Free Software Foundation. ># ># Binary versions of this program provided by Univention to you as ># well as other copyrighted, protected or trademarked materials like ># Logos, graphics, fonts, specific documentations and configurations, ># cryptographic keys etc. are subject to a license agreement between ># you and Univention and not subject to the GNU AGPL V3. ># ># In the case you use this program under the terms of the GNU AGPL V3, ># the program is provided in the hope that it will be useful, ># but WITHOUT ANY WARRANTY; without even the implied warranty of ># MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ># GNU Affero General Public License for more details. ># ># You should have received a copy of the GNU Affero General Public ># License with the Debian GNU/Linux or Univention distribution in file ># /usr/share/common-licenses/AGPL-3; if not, see ># <https://www.gnu.org/licenses/>. > >"""Univention Directory Notifier Transaction log admin commmand""" > >from __future__ import print_function > >from os import SEEK_SET, fstat, rename, unlink, environ >from os.path import exists >from sys import stderr >from argparse import ArgumentParser, ArgumentTypeError >from collections import namedtuple >from logging import basicConfig, getLogger, CRITICAL, DEBUG >from time import strftime, time >from ctypes import c_bool, c_ulong, sizeof, Structure >from errno import ENOENT >from contextlib import contextmanager >from itertools import chain >from subprocess import call > >import ldap >from ldap.dn import is_dn, str2dn >from ldap.ldapobject import ReconnectLDAPObject >from ldap.controls import SimplePagedResultsControl >from ldap.modlist import addModlist >from univention.config_registry import ConfigRegistry > >try: > from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Type, NamedTuple # noqa F401 > from types import TracebackType # noqa F401 > from argparse import Namespace # noqa F401 > Transaction = NamedTuple("Transaction", [("tid", int), ("dn", str), ("command", str)]) >except ImportError: > Transaction = namedtuple("Transaction", ["tid", "dn", "command"]) # type: ignore > >LOG_FORMAT = "%(asctime)-15s:%(levelname)s:%(message)s" >BSIZE = 4096 >HELP_RESET = 'See <https://help.univention.com/t/how-to-reset-listener-notifier-replication/11710> for more details.' >HELP_RERUN = 'You can re-run this tool with the option "--fix" in order to try to fix this issue.' >HELP_FIX = 'See <https://help.univention.com/t/problem-umc-diagnostic-module-complains-about-problems-with-udn-replication/11707/1> for more details. ' >SLD = 'slapd.service' >UDN = 'univention-directory-notifier.service' >UDL = 'univention-directory-listener.service' >SERVICES = { > 'domaincontroller_master': (SLD, UDN, UDL), > 'domaincontroller_backup': (UDL, SLD, UDN), >} # type: Dict[str, Tuple[str, ...]] > > >class IndexHeader(Structure): > """ > Header for index file. > > .. warning:: > > The header is architecture dependant due to the use of `c_ulong`. > """ > MAGIC = 0x3395e0d4L > _fields_ = [("magic", c_ulong)] > _pack_ = 1 > > >class IndexEntry(Structure): > """ > Entry of index file. > > .. warning:: > > The header is architecture dependant due to the use of `c_ulong`. > > .. warning:: > > The source is compiled with `-D_FILE_OFFSET_BITS=64` which makes `off_t` 64 bit on i386, too, but `ulong` remains 32 bit. > """ > _fields_ = [("valid", c_bool), ("offset", c_ulong)] > _pack_ = 1 > > >class Index(object): > """ > Index to efficiently lookup transactions in the translog file. > :file:`/var/lib/univention-ldap/notify/transaction.index` > """ > > def __init__(self, filename): > # type: (str) -> None > self.filename = filename + ".index" > self.log = getLogger(__name__).getChild("Index") > self.index = None # type: Optional[BinaryIO] > self.size = 0 > self.count = 0 > > def __enter__(self): > # type: () -> Index > try: > self.index = index = open(self.filename, 'rb') > > data = index.read(sizeof(IndexHeader)) > assert data, "Empty index" > header = IndexHeader.from_buffer_copy(data) > assert header.magic == header.MAGIC, header.magic > > stat = fstat(index.fileno()) > self.size = size = stat.st_size > except EnvironmentError as ex: > self.log.warning("Failed to open %s: %s", self.filename, ex) > if ex.errno != ENOENT: > raise > > self.log.info("Creating empty %s", self.filename) > self.index = index = open(self.filename, 'wb+') > > header = IndexHeader(IndexHeader.MAGIC) > data = buffer(header)[:] # type: ignore > self.index.write(data) > > self.size = size = len(data) > > count, reminder = divmod(size - sizeof(IndexHeader), sizeof(IndexEntry)) > self.log.info("Index of size %d contains %d entries", size, count) > self.count = count - 1 if count else 0 # transaction 0 is never used > assert reminder == 0, reminder > > return self > > def __exit__(self, exc_type, exc_value, traceback): > # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None > assert self.index > self.index.close() > > def __getitem__(self, tid): > # type: (int) -> IndexEntry > """ > Return index entry for given transaction. > > :param tid: Transaction id. > :returns: The index entry. > :raises IndexEntry: for transactions before 1 or after the current count. > """ > if tid < 1 or tid > self.count: > raise IndexError(tid) > > assert self.index > self.seek(tid) > > data = self.index.read(sizeof(IndexEntry)) > assert data, "Invalid index entry" > entry = IndexEntry.from_buffer_copy(data) > self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset) > > return entry > > def __setitem__(self, tid, offset): > # type: (int, int) -> None > """ > Set index entry for given transaction. > > :param tid: Transaction id. > :param offset: File offset. > :raises IndexEntry: for transactions before 1. > """ > if tid < 1: > raise IndexError(tid) > > if tid > self.count: > self.count = tid > > assert self.index > self.seek(tid) > > entry = IndexEntry(True, offset) > data = buffer(entry)[:] # type: ignore > self.index.write(data) > self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset) > > def seek(self, tid): > # type: (int) -> None > """ > Seek to given transaction id. > > :param tid: Transaction id. > """ > assert 1 <= tid <= self.count > assert self.index > > pos = sizeof(IndexHeader) + tid * sizeof(IndexEntry) > self.log.debug("Looking up transaction %d at position %d in index", tid, pos) > self.index.seek(pos, SEEK_SET) > > def remove(self): > # type: () -> None > """ > Remove `.index` file. > """ > self.log.info('Removing %s ...', self.filename) > try: > unlink(self.filename) > except EnvironmentError as ex: > self.log.debug('unlink %s: %s', self.filename, ex) > if ex.errno != ENOENT: > raise > > >class Translog(object): > """ > Transactions log file. > :file:`/var/lib/univention-ldap/notify/transaction` > """ > > def __init__(self, filename, index=None): > # type: (str, Optional[Index]) -> None > self.filename = filename > self.index = index or Index(filename).__enter__() # type: Index > self.log = getLogger(__name__).getChild("Log") > self.translog = None # type: Optional[BinaryIO] > self.size = 0 > self.first = 0 > self.last = 0 > > @property > def count(self): > # type: () -> int > """ > Return count of transactions. > """ > if self.last == 0: > return 0 > return self.last - self.first + 1 > > @staticmethod > def parse_line(line): > # type: (str) -> Transaction > """ > Parse line from transaction line. > > :param line: One transaction line > :returns: 3-tuples (transaction_number, distinguished_name, command) > """ > line = line.strip() > tid, rest = line.split(' ', 1) > dn, command = rest.rsplit(' ', 1) > return Transaction(int(tid), dn, command) > > @staticmethod > def format_line(rec): > # type: (Transaction) -> str > return '%d %s %s\n' % rec > > def __enter__(self): > # type: () -> Translog > self.translog = translog = open(self.filename, 'r') > > stat = fstat(translog.fileno()) > self.size = size = stat.st_size > > if size: > line = translog.readline() > rec = self.parse_line(line) > assert 1 <= rec.tid, rec.tid > self.first = rec.tid > > last = self.index.count > if last: > self.log.debug("Seeking to last transaction %d...", last) > self.seek(last) > else: > offset = 0 if size < BSIZE else size - BSIZE > self.read(offset) > > for line in self.translog: > self.log.debug("Read line %r", line) > rec = self.parse_line(line) > assert last <= rec.tid, (last, rec.tid) > if last < rec.tid: > self.log.warn("Index=%d < translog=%d entries", last, rec.tid) > self.last = rec.tid > > self.translog.seek(0, SEEK_SET) > > return self > > def __exit__(self, exc_type, exc_value, traceback): > # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None > assert self.translog > self.translog.close() > > def __getitem__(self, tid): > # type: (int) -> Transaction > """ > Return given transaction. > > :param tid: Transaction id. > :returns: The transaction. > :raises IndexEntry: for transactions before 1 or after the current count. > """ > assert self.index > if tid < 1 or tid > self.index.count: > raise IndexError(tid) > > assert self.translog > self.seek(tid) > > line = self.translog.readline() > self.log.debug("Read line %r", line) > > rec = self.parse_line(line) > assert tid == rec.tid, rec.tid > > return rec > > def __iter__(self): > # type: () -> Iterator[Transaction] > """ > Iterate over all transactions. > """ > assert self.translog > for line in self.translog: > self.log.debug("Read line %r", line) > rec = self.parse_line(line) > yield rec > > def seek(self, tid): > # type: (int) -> int > """ > Seek to given transaction id. > > :param tid: Transaction id. > :returns: Offset. > """ > assert self.index > assert self.first <= tid <= self.index.count > > rec = self.index[tid] > if not rec.valid: > self.log.warn("Transaction %d is invalid", tid) > raise IndexError(tid) > pos = rec.offset > self.log.debug("Seeking to transaction %d at position %d", tid, pos) > assert 0 <= pos < self.size, pos > assert self.translog > self.translog.seek(pos, SEEK_SET) > return pos > > def read(self, offset): > # type: (int) -> Transaction > """ > Read next transaction after given offset. > > :param offset: absolute file offset. > :returns: The transaction. > :raises EOFError: if reading past the end. > """ > assert 0 <= offset <= self.size, offset > if offset >= self.size: > raise EOFError() > > assert self.translog > pos = max(0, offset - BSIZE) > self.translog.seek(pos, SEEK_SET) > data = self.translog.read(offset - pos) > self.log.debug("Read from %d: %r", pos, data) > > before = data.rsplit('\n', 1)[-1] > after = self.translog.readline() > self.log.debug("Read line %r %r", before, after) > > line = before + after > rec = self.parse_line(line) > > return rec > > >class Abort(Exception): > """ > Fatal abort. > """ > > >class DryRun(Exception): > """ > Abort dry-run mode. > """ > > >def main(): > # type: () -> int > """ > Work with transaction log in LDAP server. > """ > opt = parse_args() > > basicConfig(stream=stderr, level=max(DEBUG, CRITICAL - DEBUG * opt.verbose), format=LOG_FORMAT) > > try: > return opt.func(opt) or 0 > except Abort: > return 1 > > >@contextmanager >def ldapi(opt): > # type: (Namespace) -> Iterator[ReconnectLDAPObject] > """ > Return local LDAP connection. > > :param opt: Command line options. > :returns: A initialized LDAP connection. > """ > log = getLogger(__name__).getChild("ldap") > > log.debug("ldap_set_option(PROTOCOL_VERSION)") > ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) > log.debug("ldap_initialize(%s)", opt.translog_ldap) > ld = ReconnectLDAPObject( > opt.translog_ldap, > trace_level=max(0, opt.verbose - 4), > retry_max=3, > retry_delay=10.0, > ) > log.debug("ldap_bind()") > ld.sasl_external_bind_s() > > yield ld > > log.debug("ldap_unbind()") > ld.unbind_ext_s() > > >def import_all(opt): > # type: (Namespace) -> None > """ > Load transaction from file into LDAP server. > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("import") > log.info("Reading transactions from file '%s'...", opt.translog_file) > > with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld: > if not translog.size: > log.debug("File is empty; no transactions to process.") > return > > c_tid = max(1, translog.last - opt.count + 1) > log.debug("count.tid=%d count=%d", c_tid, opt.count) > > p_offset = int(translog.size * (1.0 - opt.percent)) > try: > p_trans = translog.read(p_offset) > p_tid = p_trans.tid > except EOFError: > p_tid = translog.last > log.debug("percent.tid=%d off=%d percent=%f", p_tid, p_offset, opt.percent) > > s_offset = max(0, translog.size - opt.size) > try: > s_trans = translog.read(s_offset) > s_tid = s_trans.tid > except EOFError: > s_tid = translog.last > log.debug("size.tid=%d off=%d size=%d", s_tid, s_offset, opt.size) > > start = max(1, min(c_tid, p_tid, s_tid), translog.first, opt.min) > assert translog.first <= start <= translog.last, start > end = min(opt.max or translog.last, translog.last) > > if opt.index and index.count: > log.info("Processing transactions %d .. %d", start, end) > translog.seek(start) > lead = [] # type: List[Transaction] > else: > offset = min(p_offset, s_offset) > log.info("Processing transaction from offset %d .. %d", offset, end) > rec = translog.read(offset) > lead = [rec] > > t_begin = t_last = time() > count = end - start > for rec in chain(lead, translog): > if rec.tid < start: > continue > if rec.tid > end: > break > > add_transaction(opt, ld, rec) > > if stderr.isatty(): > t_now = time() > if t_now - t_last >= 5.0: > p_done = float(rec.tid - start) / count > t_used = t_now - t_begin > stderr.write('\rprocessed {:d} of {:d} [{:.1f}%] in {:.1f}s, {:d} remaining in {:.1f}s \r'.format( > rec.tid - start, > count, > p_done * 100.0, > t_used, > end - rec.tid, > t_used / p_done - t_used, > )) > t_last = t_now > > >def load_single(opt): > # type: (Namespace) -> None > """ > Load given transactions into LDAP server. > > :param opt: Command line options. > """ > with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld: > for tid in opt.trans: > try: > rec = translog[tid] > except IndexError: > continue > > add_transaction(opt, ld, rec) > > >def show_stat(opt): > # type: (Namespace) -> None > """ > Show statistics > > :param opt: Command line options. > """ > with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: > print("Index.file: %s" % index.filename) > print("Index.size: %d" % index.size) > print("Index.count: %d" % index.count) > print("Index.okay: %s" % ('yes' if index.count == translog.last else 'no',)) > print("Translog.file: %s" % translog.filename) > print("Translog.size: %d" % translog.size) > print("Translog.first: %d" % translog.first) > print("Translog.last: %d" % translog.last) > print("Translog.count: %d" % translog.count) > first, last = translog.first, translog.last > > tid = last > with ldapi(opt) as ld: > while tid >= first: > dn = "reqSession={},{}".format(tid, opt.translog_base) > try: > ld.search_ext_s(dn, ldap.SCOPE_BASE) > break > except ldap.NO_SUCH_OBJECT: > tid -= 1 > print("Ldap.last: %d" % tid) > print("Ldap.okay: %s" % ('yes' if last == tid else 'no',)) > > >def dump_index(opt): > # type: (Namespace) -> None > """ > Dump transaction index > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("INDEX") > > with Index(opt.translog_file) as index: > begin = opt.min > end = min(opt.max or index.count, index.count) > log.debug('Dumping index from [%d, %d] ...', begin, end) > for tid in xrange(begin, end + 1): > rec = index[tid] > print('%8d[%c]: %d' % (tid, 'x' if rec.valid else ' ', rec.offset)) > > >def reindex(opt): > # type: (Namespace) -> None > """ > Re-build transaction index > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("REINDEX") > > index = Index(opt.translog_file) > log.warning("Ignore the following 2 WARNINGS about the missing and mismatching index") > with stopped(opt, (UDN,)), rewrite(index.filename) as index.filename: > with index, Translog(opt.translog_file, index) as translog: > log.warning("Re-building index %s now...", index.filename) > assert translog.translog is not None > pos = 0 > for lnr, line in enumerate(translog.translog, start=1): > rec = translog.parse_line(line) > log.info("%d: tid=%d @ %d", lnr, rec.tid, pos) > index[rec.tid] = pos > pos += len(line.encode()) > > >def lookup(opt): > # type: (Namespace) -> None > """ > Lookup transactions. > > :param opt: Command line options. > """ > with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: > for tid in opt.trans: > print(translog[tid]) > > >def lookup_ldap(opt): > # type: (Namespace) -> None > """ > Check transactions. > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("LDAP") > > with ldapi(opt) as ld: > for tid in opt.trans: > dn = "reqSession={},{}".format(tid, opt.translog_base) > try: > result = ld.search_ext_s(dn, ldap.SCOPE_BASE) > except ldap.NO_SUCH_OBJECT as ex: > if not opt.lenient: > log.critical("ldap_search(%s): %s", dn, ex.args[0]["desc"]) > raise Abort() > ((dn, attrs),) = result > print('tid={0[reqSession][0]} dn={0[reqDN][0]} command={0[reqType][0]}'.format(attrs)) > > >def add_transaction(opt, ld, rec): > # type: (Namespace, ReconnectLDAPObject, Transaction) -> None > """ > Load single transaction into LDAP server. > > :param opt: Command line options. > :param ld: LDAP server connection. > :param rec: Transaction to add. > :raises Abort: on fatal errors. > """ > log = getLogger(__name__).getChild("LDAP") > > dn = "reqSession={.tid},{}".format(rec, opt.translog_base) > modlist = addModlist({ > "objectClass": ["auditObject"], > "reqStart": [opt.datetime], > "reqType": ["{.command}".format(rec)], > "reqSession": ["{.tid}".format(rec)], > "reqDN": ["{.dn}".format(rec)], > }) > try: > log.debug("ldap_add(%s)", dn) > if not opt.dry_run: > ld.add_ext_s(dn, modlist) > except (ldap.ALREADY_EXISTS, ldap.INVALID_SYNTAX) as ex: > if not opt.lenient: > log.critical("ldap_add(%s): %s", rec, ex.args[0]["desc"]) > raise Abort() > log.error("ldap_add(%s): %s", rec, ex.args[0]["desc"]) > except ldap.LDAPError as ex: > log.critical("ldap_add(%s): %s", rec, ex) > raise Abort() > > >def check(opt): > # type: (Namespace) -> None > """ > Check transaction files for consistency. > > :param opt: Command line options. > """ > if opt.role not in SERVICES: > print('This command is only for UCS Master and Backups.') > exit(0) > > with stopped(opt, SERVICES[opt.role]): > translog = CheckTranslog(opt, opt.translog_file) > translog.run() > > listener = CheckListener(opt, opt.listener_file) > > if exists(opt.listener_private_file): > listener_priv = CheckListener(opt, opt.listener_private_file) > listener_priv.first_tid = translog.last_tid > listener_priv.first_line = translog.last_line > listener_priv.run() > > listener.first_tid = listener_priv.last_tid > listener.first_line = listener_priv.last_line > else: > listener.first_tid = translog.last_tid > listener.first_line = translog.last_line > > listener.run() > > if opt.role == 'domaincontroller_master': > check_last(opt, opt.last_file, listener.last_tid) > > # TODO: check cn=translog (Bug #49225) > > >@contextmanager >def stopped(opt, services): > # type: (Namespace, Tuple[str, ...]) -> Iterator > """ > Stop services while in context. > > :param opt: Command line options. > :param services: Sequence of service names. > """ > log = getLogger(__name__).getChild("SERVICE") > > if opt.skip_services or not opt.fix: > yield > return > > log.info("Stopping %s", services) > cmd = ('systemctl', 'stop') + services > call(cmd) > > try: > yield > except Exception: > raise > else: > log.info("Starting %s", services) > cmd = ('systemctl', 'start') + services > call(cmd) > > >@contextmanager >def rewrite(filename): > # type: (str) -> Iterator[str] > """ > Return name of temporary file to replace given file on success. > > :param filename: Name of file to replace on success. > :returns: Name of temporary file. > """ > log = getLogger(__name__).getChild("rewrite") > cur = filename > new = filename + '.new' > bak = filename + '.bak' > try: > yield new > except Exception as ex: > log.debug('abort: %s', ex) > if exists(new): > try: > unlink(new) > except EnvironmentError as ex: > log.debug('unlink %s: %s', new, ex) > if ex.errno != ENOENT: > raise > raise > else: > log.debug('renaming %s %s', cur, bak) > try: > rename(cur, bak) > except EnvironmentError as ex: > log.debug('rename %s %s: %s', cur, bak) > if ex.errno != ENOENT: > raise > > log.debug('renaming %s %s', new, cur) > rename(new, cur) > > >class CheckGeneric(object): > """ > Check transaction file for consistency. > """ > > def __init__(self, opt, filename): > # type: (Namespace, str) -> None > """ > Check transaction file for consistency. > > :param opt: Command line options. > :param filename: The name of the file containing transaction lines. > """ > self.opt = opt > self.filename = filename > self.log = getLogger(__name__).getChild("CHECK").getChild(self.__class__.__name__) > self.first_tid = 0 > self.first_line = '' > self.reset() > > def reset(self): > # type: () -> None > """ > Reset internal state. > """ > self.needs_fixing = False > self.needs_sort = False > self.needs_fill = False > self.needs_renum = False > self.needs_syntax = False > self.lnr = 0 > self.line = '' > self.last_line = self.first_line > self.last_tid = self.first_tid > > def error(self, string, *args, **kwargs): > # type: (str, *Any, **bool) -> None > """ > Flag error in file. > > :param string: A descriptive format string, which is formatted using any additional positional arguments. > :param bool fix: The error must be fixed. > :param bool sort: The error can be fixed by sorting the transactions. > :param bool fill: The error can be fixed by filling the hole with dummy entries. > :param bool renum: The error can be fixed by re-numbering all transactions. > :param bool syntax: The line contains syntax error needing manual fixing. > """ > self.log.error('%s:%d:%r: ' + string, self.filename, self.lnr, self.line, *args) > self.needs_fixing |= kwargs.get('fix', True) > self.needs_sort |= kwargs.get('sort', False) > self.needs_fill |= kwargs.get('fill', False) > self.needs_renum |= kwargs.get('renum', False) > self.needs_syntax |= kwargs.get('syntax', False) > > def run(self, filename=''): > # type: (str) -> None > """ > Run check on trasnaction file. > > :param filename: The name of the transaction file. Defaults to :py:attribute:`filename`. > """ > self.reset() > with open(filename or self.filename, 'r') as stream: > self.log.info("Reading %s ...", filename or self.filename) > for lnr, line, rec in self.loop(stream): > self.check_transaction(rec) > self.log.info("Reading %s done", filename or self.filename) > > if self.needs_fixing and filename == '': > print() > print('%s needs fixing:' % (self.filename,)) > if self.opt.role != 'domaincontroller_master': > print('This UCS system must be re-joined!') > print() > print(HELP_FIX) > exit(1) > > self.fixit() > > def loop(self, stream): > # type: (BinaryIO) -> Iterator[Tuple[int, str, Transaction]] > """ > Loop over all lines in the stream. > > :param stream: The input stream. > :returns: yields 3-tuples (line-number, original-line-content, parsed-transaction-record). > """ > for self.lnr, line in enumerate(stream, start=1): > self.line = line > try: > if line.endswith('\n'): > line = line[:-1] > else: > self.error("Missing newline line", fix=False) > > stripped = line.strip() > if line != stripped: > self.error("Extra whitespace", fix=False) > line = stripped > stripped = line.strip('\x00') > if line != stripped: > self.error("Binary zeros", fix=False) > line = stripped > > try: > rec = Translog.parse_line(line) > except ValueError: > # Ok let's try to identify the problem > try: > tid_, rest = line.split(' ', 1) > except ValueError: > self.error("Invalid line", syntax=True) > continue > try: > tid = int(tid_) > except ValueError: > tid = -1 > try: > dn, command = rest.split(' ', 1) > except ValueError: > self.error("Invalid line", syntax=True) > dn, command = rest, '' > > rec = Transaction(tid, dn, command) > > if not 0 < rec.tid: > self.error("Invalid transaction id", renum=True) > continue > > if rec.command not in {'r', 'd', 'a', 'm'}: # mod_Rdn Delete Add Modify > self.error("Invalid command") > # if self.opt.fix: > # rec = rec._replace(command='m') > > if not is_dn(rec.dn): > self.error("Invalid dn") > # if self.opt.fix: > # rec = rec._replace(dn=self.opt.base) > elif not rec.dn.endswith(self.opt.base): > self.error("Foreign dn", fix=False) > > yield (self.lnr, line, rec) > > self.last_tid = rec.tid > finally: > self.last_line = line > > def check_transaction(self, rec): > # type: (Transaction) -> None > """ > Check transaction for consistency. > > :param rec: The transaction record to check. > """ > raise NotImplementedError() > > def fixit(self): > # type: () -> None > """ > Fix transactions. > """ > raise NotImplementedError() > > def fix_syntax(self): > # type: () -> None > # TODO: Loop over lines and > # - strip binary zeros > # - add missing command in 3rd column > # - optional: fake invalid DNs in 2nd column > # - optional: fake transaction number > # see commented out code in :py:meth:`loop`. > """ > Handle syntax errors. TODO > """ > self.log.info('Syntax ...') > > print('- contains unparseable lines!') > print() > print(HELP_RESET) > exit(1) > > def fix_sort(self): > # type: () -> None > """ > Handle transactions not being sorted strong monotonically increasing. > """ > self.log.info('Sorting ...') > print('- the transactions are not sorted uniquely') > > with rewrite(self.filename) as tmp: > cmd = ( > 'sort', > '--ignore-leading-blanks', > '--ignore-nonprinting', > '--key', '1n', # transaction ID > '--key', '2', # DN > '--key', '3r', # command 'r', 'm', 'd', 'a' > '--unique', > '--output', tmp, > self.filename, > ) > env = dict(environ) > env['LC_ALL'] = 'C.UTF-8' > self.log.debug('Running %r ...', cmd) > if call(cmd, env=env): > print('- failed to sort file!') > print('Check available space of filesystem?') > exit(2) > > self.run(filename=tmp) > > if self.needs_sort: > print('- still contains duplicate transactions after unique sorting!') > print('UCS Master must be reset and all other UCS systems must be re-joined!') > print() > print(HELP_RESET) > exit(1) > > if not self.opt.fix: > print() > print(HELP_RERUN) > print(HELP_FIX) > exit(1) > > self.remove_index() > self.log.info('Sorting done') > > def fix_fill(self): > # type: () -> None > """ > Handle transactions not being consecutive. > """ > self.log.info('Filling holes ...') > print('- missing transactions in sequence') > > if not self.opt.fix: > print() > print(HELP_RERUN) > print(HELP_FIX) > exit(1) > > with rewrite(self.filename) as tmp: > with open(self.filename, 'r') as old, open(tmp, 'w') as new: > next_tid = 0 > for line in old: > try: > rec = Translog.parse_line(line) > assert next_tid == 0 or next_tid <= rec.tid, (next_tid, rec.tid) > while next_tid != 0 and next_tid < rec.tid: > self.log.info('Filling %d ...', next_tid) > fill_rec = Transaction(tid=next_tid, dn=self.opt.base, command='m') > fill_line = Translog.format_line(fill_rec) > new.write(fill_line) > next_tid += 1 > > next_tid = rec.tid + 1 > except ValueError: > pass > new.write(line) > > self.run(filename=tmp) > > assert not self.needs_fill > > self.remove_index() > self.log.info('Filling holes done') > > def fix_renumber(self): > # type: () -> None > """ > Handle transactions being out-of-order. > """ > self.log.info('Renumbering ...') > print('- pending transactions are not consecutive') > > if not self.opt.fix: > print() > print(HELP_RERUN) > print(HELP_FIX) > exit(1) > > with rewrite(self.filename) as tmp: > with open(self.filename, 'r') as old, open(tmp, 'w') as new: > for self.last_tid, line in enumerate(old, start=self.first_tid + 1): > line = line.rstrip('\n') > self.log.debug('Renum %r to %d ...', line, self.last_tid) > try: > rec = Translog.parse_line(line) > except ValueError: > try: > tid_, rest = line.split(' ', 1) > except ValueError: > raise > try: > tid = int(tid_) > except ValueError: > tid = -1 > try: > dn, command = rest.split(' ', 1) > except ValueError: > raise > rec = Transaction(tid, dn, command) > > rec = rec._replace(tid=self.last_tid) > line = Translog.format_line(rec) > new.write(line) > > self.run(filename=tmp) > > assert not self.needs_renum > self.log.info('Renumbering done') > > def remove_index(self): > # type: () -> None > """ > Remove any associated index file. > """ > raise NotImplementedError() > > >class CheckTranslog(CheckGeneric): > """ > Check transactions log file for consistency. > :file:`/var/lib/univention-ldap/notify/transaction` > """ > > def check_transaction(self, rec): > # type: (Transaction) -> None > """ > Check committed transaction. > > :param rec: The transaction record to check. > """ > if self.lnr == 1: > self.log.info("%s:%d: Starts with: %r", self.filename, self.lnr, self.line) > elif rec.tid <= self.last_tid: > self.error("Repeated line after %r", self.last_line, sort=True) > elif self.last_tid + 1 != rec.tid: > self.error("Hole after %r", self.last_line, fill=True) > > def fixit(self): > # type: () -> None > """ > Fix transactions. > """ > if self.needs_syntax: > self.fix_syntax() > if self.needs_sort: > self.fix_sort() > if self.needs_fill: > self.fix_fill() > > def remove_index(self): > # type: () -> None > """ > Remove `.index` file. > """ > Index(self.filename).remove() > > >class CheckListener(CheckGeneric): > """ > Check pending transactions log file for consistency. > :file:`/var/lib/univention-ldap/listener/listener` > """ > > def check_transaction(self, rec): > # type: (Transaction) -> None > """ > Check pending transaction. > > :param rec: Transaction record. > """ > if self.last_tid + 1 == rec.tid: > pass > elif rec.tid <= self.last_tid: > self.error("Repeated line %r", self.last_line, renum=True) > elif self.lnr == 1: > self.error("Not continuous with %s: %r", self.opt.translog_file, self.last_line, renum=True) > else: > self.error("Hole after %r", self.last_line, renum=True) > > def fixit(self): > # type: () -> None > """ > Fix transactions. > """ > if self.needs_syntax: > self.fix_syntax() > if self.needs_renum: > self.fix_renumber() > > def remove_index(self): > # type: () -> None > """ > No associated `.index` file to remove. > """ > pass > > >def check_last(opt, filename, last_tid): > # type: (Namespace, str, int) -> None > """ > Check last used transactions id. > :file:`/var/lib/univention-ldap/last_id` > > :param opt: Command line options. > :param filename: File name. > :param last_tid: Expected transaction id. > """ > log = getLogger(__name__).getChild("CHECK") > > try: > with open(filename, 'r') as last: > line = last.read() > if not line: > raise ValueError('should be %d, but is empty' % (last_tid,)) > tid = int(line) > if tid != last_tid: > raise ValueError('should be %d, but is %d' % (last_tid, tid)) > except (EnvironmentError, ValueError) as ex: > log.error("%s: Invalid last id: %s", filename, ex) > if opt.fix: > with open(filename, 'w') as last: > last.write('%d' % (last_tid,)) > log.debug("%s: Updated to: %d", filename, last_tid) > else: > print() > print('%s needs manual fixing!' % (filename,)) > print() > print(HELP_RERUN) > print(HELP_FIX) > exit(1) > > >def prune(opt): > # type: (Namespace) -> None > """ > Prune old transactions. > > :param opt: Command line options. > """ > if opt.role not in SERVICES: > print('This command is only for UCS Master and Backups.') > exit(0) > > opt.skip_services = opt.dry_run > with stopped(opt, (UDN,)): > try: > prune_file(opt) > except DryRun: > pass > prune_ldap(opt) > > >def prune_file(opt): > """ > Prune old transactions from file. > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("prune").getChild("file") > > with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog: > if opt.trans < 0: > log.info('Keeping last %d transactions before %d', -opt.trans, translog.last) > opt.trans = translog.last + opt.trans + 1 > > log.info('Purging/keeping %d..%d..%d', translog.first, opt.trans, translog.last) > if opt.trans <= translog.first: > log.fatal('Already purged.') > raise Abort() > if opt.trans >= translog.last: > log.fatal('Nothing to purge.') > raise Abort() > assert translog.first < opt.trans < translog.last > > with rewrite(opt.translog_file) as tmp, open(tmp, 'w') as new: > for rec in translog: > if rec.tid < opt.trans: > log.debug('Dropping %d', rec.tid) > continue > line = Translog.format_line(rec) > new.write(line) > > if opt.dry_run: > raise DryRun() > > index.remove() > > >def prune_ldap(opt): > """ > Prune old transactions from LDAP. > > :param opt: Command line options. > """ > log = getLogger(__name__).getChild("prune").getChild("ldap") > > base = opt.translog_base > scope = ldap.SCOPE_ONELEVEL > filterstr = '(reqSession=*)' > attrlist = ['1.1'] # LDAP_NO_ATTRS > page_control = SimplePagedResultsControl(True, size=1000, cookie='') > with ldapi(opt) as ld: > while True: > response = ld.search_ext(base, scope, filterstr, attrlist, serverctrls=[page_control]) > rtype, rdata, rmsgid, serverctrls = ld.result3(response) > > for control in serverctrls: > if control.controlType == SimplePagedResultsControl.controlType: > break > else: > log.fatal('Server ignores RFC 2696 control: %r', serverctrls) > raise Abort() > > for (dn, attrs) in rdata: > ava = str2dn(dn) > for (attr, value, _flags) in ava[0]: > if attr == 'reqSession': > break > else: > continue > tid = int(value) > if tid >= opt.trans: > log.debug('Keeping %s', dn) > continue > try: > log.debug("ldap_delete(%s)", dn) > if not opt.dry_run: > ld.delete_ext_s(dn) > log.info("Deleted %s", dn) > except ldap.LDAPError as ex: > log.error("ldap_delete(%s): %s", dn, ex) > > if not control.cookie: > break > page_control.cookie = control.cookie > > >def parse_args(args=None): > # type: (List[str]) -> Namespace > """ > Parse command line arguments. > > :param args: the list of arguments to process (default: `sys.argv[1:]`) > :returns: a Namespace instance. > """ > ucr = ConfigRegistry() > ucr.load() > > parser = ArgumentParser(description=__doc__) > parser.add_argument("--translog-file", "-T", metavar="FILENAME", help="Transaction file [%(default)s]", default="/var/lib/univention-ldap/notify/transaction") > parser.add_argument("--translog-ldap", "-H", metavar="URL", help="LDAP URL [%(default)s]", default="ldapi:///") > parser.add_argument("--translog-base", "-B", metavar="DN", help="LDAP base for translog [%(default)s]", default="cn=translog") > parser.add_argument("--lenient", "-l", action="store_true", help="Ignore existing entries") > parser.add_argument("--verbose", "-v", action="count", help="Increase verbosity", default=2) > parser.add_argument("--dry-run", "-n", action="store_true", help="Do not modify anything") > parser.add_argument("--datetime", "-d", help="Overwrite time-stamp for import [%(default)s]", default=strftime("%Y%m%d%H%M%SZ")) > subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands") > > parser_import = subparsers.add_parser("import", help="Import transaction file into LDAP") > parser_import.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions") > import_amount = parser_import.add_argument_group("minimum", description="Amount of transactions to import") > import_amount.add_argument("--count", "-c", type=parse_int_pos, help="Minimum number of transaction [%(default)s]", default=100000) > import_amount.add_argument("--percent", "-p", type=parse_percent, help="Minimum percentage of transaction file [%(default)s%%]", default="0") > import_amount.add_argument("--size", "-s", type=parse_size, help="Minimum number of bytes from transaction file [%(default)s]", default="10M") > import_limit = parser_import.add_argument_group("limit", description="Limit transactions to process") > import_limit.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1) > import_limit.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process") > parser_import.set_defaults(func=import_all) > > parser_load = subparsers.add_parser("load", help="Load specified transactions into LDAP") > parser_load.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to process") > parser_load.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions") > parser_load.set_defaults(func=load_single) > > parser_lookup = subparsers.add_parser("lookup", help="Lookup transaction in file") > parser_lookup.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup") > parser_lookup.set_defaults(func=lookup) > > parser_ldap = subparsers.add_parser("ldap", help="Lookup transaction in LDAP") > parser_ldap.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup") > parser_ldap.set_defaults(func=lookup_ldap) > > parser_stat = subparsers.add_parser("stat", help="Show statistics") > parser_stat.set_defaults(func=show_stat) > > parser_index = subparsers.add_parser("index", help="Dump index file") > parser_index.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1) > parser_index.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process") > parser_index.set_defaults(func=dump_index) > > parser_reindex = subparsers.add_parser("reindex", help="Re-build index file") > parser_reindex.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services") > parser_reindex.set_defaults(func=reindex, fix=True) > > parser_check = subparsers.add_parser("check", help="Check transaction files") > parser_check.add_argument("--fix", "-f", action="store_true", help="Try to fix issues") > parser_check.add_argument("--base", "-b", metavar="DN", help="LDAP base [%(default)s]", default=ucr['ldap/base']) > parser_check.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role']) > parser_check.add_argument("--listener-file", "-L", metavar="FILENAME", help="Listener file [%(default)s]", default="/var/lib/univention-ldap/listener/listener") > parser_check.add_argument("--listener-private-file", "-P", metavar="FILENAME", help="Listener private file [%(default)s]", default="/var/lib/univention-ldap/listener/listener.priv") > parser_check.add_argument("--last-file", "-I", metavar="FILENAME", help="Last_id file [%(default)s]", default="/var/lib/univention-ldap/last_id") > parser_check.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services") > parser_check.set_defaults(func=check) > > parser_prune = subparsers.add_parser("prune", help="Prune transaction files") > parser_prune.add_argument("trans", metavar="TID", type=int, help="Oldest transaction number to keep (negative numbers: the number of transactions to keep)") > parser_prune.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role']) > parser_prune.set_defaults(func=prune, fix=True) > > opt = parser.parse_args(args) > > return opt > > >def parse_int_pos(string): > # type: (str) -> long > """ > Parse positive integer string. > > :param string: The command line string. > :returns: The parsed integer. > :raises ArgumentTypeError: if the string is valid. > """ > try: > val = long(string) > if 0 < val: > return val > except ValueError: > pass > raise ArgumentTypeError("Positive integer required") > > >def parse_percent(string): > # type: (str) -> float > """ > Parse percentage string. > > :param string: The command line string. > :returns: The parsed percentage. > :raises ArgumentTypeError: if the string is valid. > """ > try: > val = float(string.rstrip('%')) > if not 0.0 <= val <= 100.0: > raise ValueError() > except ValueError: > raise ArgumentTypeError("Invalid percentage") > return val / 100.0 > > >def parse_size(string): > # type: (str) -> long > """ > Parse size string. > > :param string: The command line string. > :returns: The parsed size. > :raises ArgumentTypeError: if the string is valid. > """ > suffix = string.lstrip(".0123456789") > try: > unit, = suffix.rstrip("iIbB").upper() or ' ' > scale = 1L << (10 * " KMGTPE".index(unit)) > except ValueError: > raise ArgumentTypeError("Invalid unit") > > prefix = string[:-len(suffix)] if suffix else string > try: > value = float(prefix) > if not 0.0 <= value: > raise ValueError() > except ValueError: > raise ArgumentTypeError("Invalid value") > > return long(value * scale) > > >if __name__ == "__main__": > exit(main()) >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 55863
: 11049