#!/usr/bin/python2.7
# vim: set fileencoding=utf-8 filetype=python foldmethod=expr expandtab shiftwidth=4 tabstop=4 :
# Copyright 2019-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# .
"""Univention Directory Notifier Transaction log admin commmand"""
from __future__ import print_function
from os import SEEK_SET, fstat, rename, unlink, environ
from os.path import exists
from sys import stderr
from argparse import ArgumentParser, ArgumentTypeError
from collections import namedtuple
from logging import basicConfig, getLogger, CRITICAL, DEBUG
from time import strftime, time
from ctypes import c_bool, c_ulong, sizeof, Structure
from errno import ENOENT
from contextlib import contextmanager
from itertools import chain
from subprocess import call
import ldap
from ldap.dn import is_dn, str2dn
from ldap.ldapobject import ReconnectLDAPObject
from ldap.controls import SimplePagedResultsControl
from ldap.modlist import addModlist
from univention.config_registry import ConfigRegistry
try:
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Type, NamedTuple # noqa F401
from types import TracebackType # noqa F401
from argparse import Namespace # noqa F401
Transaction = NamedTuple("Transaction", [("tid", int), ("dn", str), ("command", str)])
except ImportError:
Transaction = namedtuple("Transaction", ["tid", "dn", "command"]) # type: ignore
LOG_FORMAT = "%(asctime)-15s:%(levelname)s:%(message)s"
BSIZE = 4096
HELP_RESET = 'See for more details.'
HELP_RERUN = 'You can re-run this tool with the option "--fix" in order to try to fix this issue.'
HELP_FIX = 'See for more details. '
SLD = 'slapd.service'
UDN = 'univention-directory-notifier.service'
UDL = 'univention-directory-listener.service'
SERVICES = {
'domaincontroller_master': (SLD, UDN, UDL),
'domaincontroller_backup': (UDL, SLD, UDN),
} # type: Dict[str, Tuple[str, ...]]
class IndexHeader(Structure):
"""
Header for index file.
.. warning::
The header is architecture dependant due to the use of `c_ulong`.
"""
MAGIC = 0x3395e0d4L
_fields_ = [("magic", c_ulong)]
_pack_ = 1
class IndexEntry(Structure):
"""
Entry of index file.
.. warning::
The header is architecture dependant due to the use of `c_ulong`.
.. warning::
The source is compiled with `-D_FILE_OFFSET_BITS=64` which makes `off_t` 64 bit on i386, too, but `ulong` remains 32 bit.
"""
_fields_ = [("valid", c_bool), ("offset", c_ulong)]
_pack_ = 1
class Index(object):
"""
Index to efficiently lookup transactions in the translog file.
:file:`/var/lib/univention-ldap/notify/transaction.index`
"""
def __init__(self, filename):
# type: (str) -> None
self.filename = filename + ".index"
self.log = getLogger(__name__).getChild("Index")
self.index = None # type: Optional[BinaryIO]
self.size = 0
self.count = 0
def __enter__(self):
# type: () -> Index
try:
self.index = index = open(self.filename, 'rb')
data = index.read(sizeof(IndexHeader))
assert data, "Empty index"
header = IndexHeader.from_buffer_copy(data)
assert header.magic == header.MAGIC, header.magic
stat = fstat(index.fileno())
self.size = size = stat.st_size
except EnvironmentError as ex:
self.log.warning("Failed to open %s: %s", self.filename, ex)
if ex.errno != ENOENT:
raise
self.log.info("Creating empty %s", self.filename)
self.index = index = open(self.filename, 'wb+')
header = IndexHeader(IndexHeader.MAGIC)
data = buffer(header)[:] # type: ignore
self.index.write(data)
self.size = size = len(data)
count, reminder = divmod(size - sizeof(IndexHeader), sizeof(IndexEntry))
self.log.info("Index of size %d contains %d entries", size, count)
self.count = count - 1 if count else 0 # transaction 0 is never used
assert reminder == 0, reminder
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
assert self.index
self.index.close()
def __getitem__(self, tid):
# type: (int) -> IndexEntry
"""
Return index entry for given transaction.
:param tid: Transaction id.
:returns: The index entry.
:raises IndexEntry: for transactions before 1 or after the current count.
"""
if tid < 1 or tid > self.count:
raise IndexError(tid)
assert self.index
self.seek(tid)
data = self.index.read(sizeof(IndexEntry))
assert data, "Invalid index entry"
entry = IndexEntry.from_buffer_copy(data)
self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset)
return entry
def __setitem__(self, tid, offset):
# type: (int, int) -> None
"""
Set index entry for given transaction.
:param tid: Transaction id.
:param offset: File offset.
:raises IndexEntry: for transactions before 1.
"""
if tid < 1:
raise IndexError(tid)
if tid > self.count:
self.count = tid
assert self.index
self.seek(tid)
entry = IndexEntry(True, offset)
data = buffer(entry)[:] # type: ignore
self.index.write(data)
self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset)
def seek(self, tid):
# type: (int) -> None
"""
Seek to given transaction id.
:param tid: Transaction id.
"""
assert 1 <= tid <= self.count
assert self.index
pos = sizeof(IndexHeader) + tid * sizeof(IndexEntry)
self.log.debug("Looking up transaction %d at position %d in index", tid, pos)
self.index.seek(pos, SEEK_SET)
def remove(self):
# type: () -> None
"""
Remove `.index` file.
"""
self.log.info('Removing %s ...', self.filename)
try:
unlink(self.filename)
except EnvironmentError as ex:
self.log.debug('unlink %s: %s', self.filename, ex)
if ex.errno != ENOENT:
raise
class Translog(object):
"""
Transactions log file.
:file:`/var/lib/univention-ldap/notify/transaction`
"""
def __init__(self, filename, index=None):
# type: (str, Optional[Index]) -> None
self.filename = filename
self.index = index or Index(filename).__enter__() # type: Index
self.log = getLogger(__name__).getChild("Log")
self.translog = None # type: Optional[BinaryIO]
self.size = 0
self.first = 0
self.last = 0
@property
def count(self):
# type: () -> int
"""
Return count of transactions.
"""
if self.last == 0:
return 0
return self.last - self.first + 1
@staticmethod
def parse_line(line):
# type: (str) -> Transaction
"""
Parse line from transaction line.
:param line: One transaction line
:returns: 3-tuples (transaction_number, distinguished_name, command)
"""
line = line.strip()
tid, rest = line.split(' ', 1)
dn, command = rest.rsplit(' ', 1)
return Transaction(int(tid), dn, command)
@staticmethod
def format_line(rec):
# type: (Transaction) -> str
return '%d %s %s\n' % rec
def __enter__(self):
# type: () -> Translog
self.translog = translog = open(self.filename, 'r')
stat = fstat(translog.fileno())
self.size = size = stat.st_size
if size:
line = translog.readline()
rec = self.parse_line(line)
assert 1 <= rec.tid, rec.tid
self.first = rec.tid
last = self.index.count
if last:
self.log.debug("Seeking to last transaction %d...", last)
self.seek(last)
else:
offset = 0 if size < BSIZE else size - BSIZE
self.read(offset)
for line in self.translog:
self.log.debug("Read line %r", line)
rec = self.parse_line(line)
assert last <= rec.tid, (last, rec.tid)
if last < rec.tid:
self.log.warn("Index=%d < translog=%d entries", last, rec.tid)
self.last = rec.tid
self.translog.seek(0, SEEK_SET)
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
assert self.translog
self.translog.close()
def __getitem__(self, tid):
# type: (int) -> Transaction
"""
Return given transaction.
:param tid: Transaction id.
:returns: The transaction.
:raises IndexEntry: for transactions before 1 or after the current count.
"""
assert self.index
if tid < 1 or tid > self.index.count:
raise IndexError(tid)
assert self.translog
self.seek(tid)
line = self.translog.readline()
self.log.debug("Read line %r", line)
rec = self.parse_line(line)
assert tid == rec.tid, rec.tid
return rec
def __iter__(self):
# type: () -> Iterator[Transaction]
"""
Iterate over all transactions.
"""
assert self.translog
for line in self.translog:
self.log.debug("Read line %r", line)
rec = self.parse_line(line)
yield rec
def seek(self, tid):
# type: (int) -> int
"""
Seek to given transaction id.
:param tid: Transaction id.
:returns: Offset.
"""
assert self.index
assert self.first <= tid <= self.index.count
rec = self.index[tid]
if not rec.valid:
self.log.warn("Transaction %d is invalid", tid)
raise IndexError(tid)
pos = rec.offset
self.log.debug("Seeking to transaction %d at position %d", tid, pos)
assert 0 <= pos < self.size, pos
assert self.translog
self.translog.seek(pos, SEEK_SET)
return pos
def read(self, offset):
# type: (int) -> Transaction
"""
Read next transaction after given offset.
:param offset: absolute file offset.
:returns: The transaction.
:raises EOFError: if reading past the end.
"""
assert 0 <= offset <= self.size, offset
if offset >= self.size:
raise EOFError()
assert self.translog
pos = max(0, offset - BSIZE)
self.translog.seek(pos, SEEK_SET)
data = self.translog.read(offset - pos)
self.log.debug("Read from %d: %r", pos, data)
before = data.rsplit('\n', 1)[-1]
after = self.translog.readline()
self.log.debug("Read line %r %r", before, after)
line = before + after
rec = self.parse_line(line)
return rec
class Abort(Exception):
"""
Fatal abort.
"""
class DryRun(Exception):
"""
Abort dry-run mode.
"""
def main():
# type: () -> int
"""
Work with transaction log in LDAP server.
"""
opt = parse_args()
basicConfig(stream=stderr, level=max(DEBUG, CRITICAL - DEBUG * opt.verbose), format=LOG_FORMAT)
try:
return opt.func(opt) or 0
except Abort:
return 1
@contextmanager
def ldapi(opt):
# type: (Namespace) -> Iterator[ReconnectLDAPObject]
"""
Return local LDAP connection.
:param opt: Command line options.
:returns: A initialized LDAP connection.
"""
log = getLogger(__name__).getChild("ldap")
log.debug("ldap_set_option(PROTOCOL_VERSION)")
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3)
log.debug("ldap_initialize(%s)", opt.translog_ldap)
ld = ReconnectLDAPObject(
opt.translog_ldap,
trace_level=max(0, opt.verbose - 4),
retry_max=3,
retry_delay=10.0,
)
log.debug("ldap_bind()")
ld.sasl_external_bind_s()
yield ld
log.debug("ldap_unbind()")
ld.unbind_ext_s()
def import_all(opt):
# type: (Namespace) -> None
"""
Load transaction from file into LDAP server.
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("import")
log.info("Reading transactions from file '%s'...", opt.translog_file)
with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld:
if not translog.size:
log.debug("File is empty; no transactions to process.")
return
c_tid = max(1, translog.last - opt.count + 1)
log.debug("count.tid=%d count=%d", c_tid, opt.count)
p_offset = int(translog.size * (1.0 - opt.percent))
try:
p_trans = translog.read(p_offset)
p_tid = p_trans.tid
except EOFError:
p_tid = translog.last
log.debug("percent.tid=%d off=%d percent=%f", p_tid, p_offset, opt.percent)
s_offset = max(0, translog.size - opt.size)
try:
s_trans = translog.read(s_offset)
s_tid = s_trans.tid
except EOFError:
s_tid = translog.last
log.debug("size.tid=%d off=%d size=%d", s_tid, s_offset, opt.size)
start = max(1, min(c_tid, p_tid, s_tid), translog.first, opt.min)
assert translog.first <= start <= translog.last, start
end = min(opt.max or translog.last, translog.last)
if opt.index and index.count:
log.info("Processing transactions %d .. %d", start, end)
translog.seek(start)
lead = [] # type: List[Transaction]
else:
offset = min(p_offset, s_offset)
log.info("Processing transaction from offset %d .. %d", offset, end)
rec = translog.read(offset)
lead = [rec]
t_begin = t_last = time()
count = end - start
for rec in chain(lead, translog):
if rec.tid < start:
continue
if rec.tid > end:
break
add_transaction(opt, ld, rec)
if stderr.isatty():
t_now = time()
if t_now - t_last >= 5.0:
p_done = float(rec.tid - start) / count
t_used = t_now - t_begin
stderr.write('\rprocessed {:d} of {:d} [{:.1f}%] in {:.1f}s, {:d} remaining in {:.1f}s \r'.format(
rec.tid - start,
count,
p_done * 100.0,
t_used,
end - rec.tid,
t_used / p_done - t_used,
))
t_last = t_now
def load_single(opt):
# type: (Namespace) -> None
"""
Load given transactions into LDAP server.
:param opt: Command line options.
"""
with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld:
for tid in opt.trans:
try:
rec = translog[tid]
except IndexError:
continue
add_transaction(opt, ld, rec)
def show_stat(opt):
# type: (Namespace) -> None
"""
Show statistics
:param opt: Command line options.
"""
with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
print("Index.file: %s" % index.filename)
print("Index.size: %d" % index.size)
print("Index.count: %d" % index.count)
print("Index.okay: %s" % ('yes' if index.count == translog.last else 'no',))
print("Translog.file: %s" % translog.filename)
print("Translog.size: %d" % translog.size)
print("Translog.first: %d" % translog.first)
print("Translog.last: %d" % translog.last)
print("Translog.count: %d" % translog.count)
first, last = translog.first, translog.last
tid = last
with ldapi(opt) as ld:
while tid >= first:
dn = "reqSession={},{}".format(tid, opt.translog_base)
try:
ld.search_ext_s(dn, ldap.SCOPE_BASE)
break
except ldap.NO_SUCH_OBJECT:
tid -= 1
print("Ldap.last: %d" % tid)
print("Ldap.okay: %s" % ('yes' if last == tid else 'no',))
def dump_index(opt):
# type: (Namespace) -> None
"""
Dump transaction index
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("INDEX")
with Index(opt.translog_file) as index:
begin = opt.min
end = min(opt.max or index.count, index.count)
log.debug('Dumping index from [%d, %d] ...', begin, end)
for tid in xrange(begin, end + 1):
rec = index[tid]
print('%8d[%c]: %d' % (tid, 'x' if rec.valid else ' ', rec.offset))
def reindex(opt):
# type: (Namespace) -> None
"""
Re-build transaction index
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("REINDEX")
index = Index(opt.translog_file)
log.warning("Ignore the following 2 WARNINGS about the missing and mismatching index")
with stopped(opt, (UDN,)), rewrite(index.filename) as index.filename:
with index, Translog(opt.translog_file, index) as translog:
log.warning("Re-building index %s now...", index.filename)
assert translog.translog is not None
pos = 0
for lnr, line in enumerate(translog.translog, start=1):
rec = translog.parse_line(line)
log.info("%d: tid=%d @ %d", lnr, rec.tid, pos)
index[rec.tid] = pos
pos += len(line.encode())
def lookup(opt):
# type: (Namespace) -> None
"""
Lookup transactions.
:param opt: Command line options.
"""
with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
for tid in opt.trans:
print(translog[tid])
def lookup_ldap(opt):
# type: (Namespace) -> None
"""
Check transactions.
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("LDAP")
with ldapi(opt) as ld:
for tid in opt.trans:
dn = "reqSession={},{}".format(tid, opt.translog_base)
try:
result = ld.search_ext_s(dn, ldap.SCOPE_BASE)
except ldap.NO_SUCH_OBJECT as ex:
if not opt.lenient:
log.critical("ldap_search(%s): %s", dn, ex.args[0]["desc"])
raise Abort()
((dn, attrs),) = result
print('tid={0[reqSession][0]} dn={0[reqDN][0]} command={0[reqType][0]}'.format(attrs))
def add_transaction(opt, ld, rec):
# type: (Namespace, ReconnectLDAPObject, Transaction) -> None
"""
Load single transaction into LDAP server.
:param opt: Command line options.
:param ld: LDAP server connection.
:param rec: Transaction to add.
:raises Abort: on fatal errors.
"""
log = getLogger(__name__).getChild("LDAP")
dn = "reqSession={.tid},{}".format(rec, opt.translog_base)
modlist = addModlist({
"objectClass": ["auditObject"],
"reqStart": [opt.datetime],
"reqType": ["{.command}".format(rec)],
"reqSession": ["{.tid}".format(rec)],
"reqDN": ["{.dn}".format(rec)],
})
try:
log.debug("ldap_add(%s)", dn)
if not opt.dry_run:
ld.add_ext_s(dn, modlist)
except (ldap.ALREADY_EXISTS, ldap.INVALID_SYNTAX) as ex:
if not opt.lenient:
log.critical("ldap_add(%s): %s", rec, ex.args[0]["desc"])
raise Abort()
log.error("ldap_add(%s): %s", rec, ex.args[0]["desc"])
except ldap.LDAPError as ex:
log.critical("ldap_add(%s): %s", rec, ex)
raise Abort()
def check(opt):
# type: (Namespace) -> None
"""
Check transaction files for consistency.
:param opt: Command line options.
"""
if opt.role not in SERVICES:
print('This command is only for UCS Master and Backups.')
exit(0)
with stopped(opt, SERVICES[opt.role]):
translog = CheckTranslog(opt, opt.translog_file)
translog.run()
listener = CheckListener(opt, opt.listener_file)
if exists(opt.listener_private_file):
listener_priv = CheckListener(opt, opt.listener_private_file)
listener_priv.first_tid = translog.last_tid
listener_priv.first_line = translog.last_line
listener_priv.run()
listener.first_tid = listener_priv.last_tid
listener.first_line = listener_priv.last_line
else:
listener.first_tid = translog.last_tid
listener.first_line = translog.last_line
listener.run()
if opt.role == 'domaincontroller_master':
check_last(opt, opt.last_file, listener.last_tid)
# TODO: check cn=translog (Bug #49225)
@contextmanager
def stopped(opt, services):
# type: (Namespace, Tuple[str, ...]) -> Iterator
"""
Stop services while in context.
:param opt: Command line options.
:param services: Sequence of service names.
"""
log = getLogger(__name__).getChild("SERVICE")
if opt.skip_services or not opt.fix:
yield
return
log.info("Stopping %s", services)
cmd = ('systemctl', 'stop') + services
call(cmd)
try:
yield
except Exception:
raise
else:
log.info("Starting %s", services)
cmd = ('systemctl', 'start') + services
call(cmd)
@contextmanager
def rewrite(filename):
# type: (str) -> Iterator[str]
"""
Return name of temporary file to replace given file on success.
:param filename: Name of file to replace on success.
:returns: Name of temporary file.
"""
log = getLogger(__name__).getChild("rewrite")
cur = filename
new = filename + '.new'
bak = filename + '.bak'
try:
yield new
except Exception as ex:
log.debug('abort: %s', ex)
if exists(new):
try:
unlink(new)
except EnvironmentError as ex:
log.debug('unlink %s: %s', new, ex)
if ex.errno != ENOENT:
raise
raise
else:
log.debug('renaming %s %s', cur, bak)
try:
rename(cur, bak)
except EnvironmentError as ex:
log.debug('rename %s %s: %s', cur, bak)
if ex.errno != ENOENT:
raise
log.debug('renaming %s %s', new, cur)
rename(new, cur)
class CheckGeneric(object):
"""
Check transaction file for consistency.
"""
def __init__(self, opt, filename):
# type: (Namespace, str) -> None
"""
Check transaction file for consistency.
:param opt: Command line options.
:param filename: The name of the file containing transaction lines.
"""
self.opt = opt
self.filename = filename
self.log = getLogger(__name__).getChild("CHECK").getChild(self.__class__.__name__)
self.first_tid = 0
self.first_line = ''
self.reset()
def reset(self):
# type: () -> None
"""
Reset internal state.
"""
self.needs_fixing = False
self.needs_sort = False
self.needs_fill = False
self.needs_renum = False
self.needs_syntax = False
self.lnr = 0
self.line = ''
self.last_line = self.first_line
self.last_tid = self.first_tid
def error(self, string, *args, **kwargs):
# type: (str, *Any, **bool) -> None
"""
Flag error in file.
:param string: A descriptive format string, which is formatted using any additional positional arguments.
:param bool fix: The error must be fixed.
:param bool sort: The error can be fixed by sorting the transactions.
:param bool fill: The error can be fixed by filling the hole with dummy entries.
:param bool renum: The error can be fixed by re-numbering all transactions.
:param bool syntax: The line contains syntax error needing manual fixing.
"""
self.log.error('%s:%d:%r: ' + string, self.filename, self.lnr, self.line, *args)
self.needs_fixing |= kwargs.get('fix', True)
self.needs_sort |= kwargs.get('sort', False)
self.needs_fill |= kwargs.get('fill', False)
self.needs_renum |= kwargs.get('renum', False)
self.needs_syntax |= kwargs.get('syntax', False)
def run(self, filename=''):
# type: (str) -> None
"""
Run check on trasnaction file.
:param filename: The name of the transaction file. Defaults to :py:attribute:`filename`.
"""
self.reset()
with open(filename or self.filename, 'r') as stream:
self.log.info("Reading %s ...", filename or self.filename)
for lnr, line, rec in self.loop(stream):
self.check_transaction(rec)
self.log.info("Reading %s done", filename or self.filename)
if self.needs_fixing and filename == '':
print()
print('%s needs fixing:' % (self.filename,))
if self.opt.role != 'domaincontroller_master':
print('This UCS system must be re-joined!')
print()
print(HELP_FIX)
exit(1)
self.fixit()
def loop(self, stream):
# type: (BinaryIO) -> Iterator[Tuple[int, str, Transaction]]
"""
Loop over all lines in the stream.
:param stream: The input stream.
:returns: yields 3-tuples (line-number, original-line-content, parsed-transaction-record).
"""
for self.lnr, line in enumerate(stream, start=1):
self.line = line
try:
if line.endswith('\n'):
line = line[:-1]
else:
self.error("Missing newline line", fix=False)
stripped = line.strip()
if line != stripped:
self.error("Extra whitespace", fix=False)
line = stripped
stripped = line.strip('\x00')
if line != stripped:
self.error("Binary zeros", fix=False)
line = stripped
try:
rec = Translog.parse_line(line)
except ValueError:
# Ok let's try to identify the problem
try:
tid_, rest = line.split(' ', 1)
except ValueError:
self.error("Invalid line", syntax=True)
continue
try:
tid = int(tid_)
except ValueError:
tid = -1
try:
dn, command = rest.split(' ', 1)
except ValueError:
self.error("Invalid line", syntax=True)
dn, command = rest, ''
rec = Transaction(tid, dn, command)
if not 0 < rec.tid:
self.error("Invalid transaction id", renum=True)
continue
if rec.command not in {'r', 'd', 'a', 'm'}: # mod_Rdn Delete Add Modify
self.error("Invalid command")
# if self.opt.fix:
# rec = rec._replace(command='m')
if not is_dn(rec.dn):
self.error("Invalid dn")
# if self.opt.fix:
# rec = rec._replace(dn=self.opt.base)
elif not rec.dn.endswith(self.opt.base):
self.error("Foreign dn", fix=False)
yield (self.lnr, line, rec)
self.last_tid = rec.tid
finally:
self.last_line = line
def check_transaction(self, rec):
# type: (Transaction) -> None
"""
Check transaction for consistency.
:param rec: The transaction record to check.
"""
raise NotImplementedError()
def fixit(self):
# type: () -> None
"""
Fix transactions.
"""
raise NotImplementedError()
def fix_syntax(self):
# type: () -> None
# TODO: Loop over lines and
# - strip binary zeros
# - add missing command in 3rd column
# - optional: fake invalid DNs in 2nd column
# - optional: fake transaction number
# see commented out code in :py:meth:`loop`.
"""
Handle syntax errors. TODO
"""
self.log.info('Syntax ...')
print('- contains unparseable lines!')
print()
print(HELP_RESET)
exit(1)
def fix_sort(self):
# type: () -> None
"""
Handle transactions not being sorted strong monotonically increasing.
"""
self.log.info('Sorting ...')
print('- the transactions are not sorted uniquely')
with rewrite(self.filename) as tmp:
cmd = (
'sort',
'--ignore-leading-blanks',
'--ignore-nonprinting',
'--key', '1n', # transaction ID
'--key', '2', # DN
'--key', '3r', # command 'r', 'm', 'd', 'a'
'--unique',
'--output', tmp,
self.filename,
)
env = dict(environ)
env['LC_ALL'] = 'C.UTF-8'
self.log.debug('Running %r ...', cmd)
if call(cmd, env=env):
print('- failed to sort file!')
print('Check available space of filesystem?')
exit(2)
self.run(filename=tmp)
if self.needs_sort:
print('- still contains duplicate transactions after unique sorting!')
print('UCS Master must be reset and all other UCS systems must be re-joined!')
print()
print(HELP_RESET)
exit(1)
if not self.opt.fix:
print()
print(HELP_RERUN)
print(HELP_FIX)
exit(1)
self.remove_index()
self.log.info('Sorting done')
def fix_fill(self):
# type: () -> None
"""
Handle transactions not being consecutive.
"""
self.log.info('Filling holes ...')
print('- missing transactions in sequence')
if not self.opt.fix:
print()
print(HELP_RERUN)
print(HELP_FIX)
exit(1)
with rewrite(self.filename) as tmp:
with open(self.filename, 'r') as old, open(tmp, 'w') as new:
next_tid = 0
for line in old:
try:
rec = Translog.parse_line(line)
assert next_tid == 0 or next_tid <= rec.tid, (next_tid, rec.tid)
while next_tid != 0 and next_tid < rec.tid:
self.log.info('Filling %d ...', next_tid)
fill_rec = Transaction(tid=next_tid, dn=self.opt.base, command='m')
fill_line = Translog.format_line(fill_rec)
new.write(fill_line)
next_tid += 1
next_tid = rec.tid + 1
except ValueError:
pass
new.write(line)
self.run(filename=tmp)
assert not self.needs_fill
self.remove_index()
self.log.info('Filling holes done')
def fix_renumber(self):
# type: () -> None
"""
Handle transactions being out-of-order.
"""
self.log.info('Renumbering ...')
print('- pending transactions are not consecutive')
if not self.opt.fix:
print()
print(HELP_RERUN)
print(HELP_FIX)
exit(1)
with rewrite(self.filename) as tmp:
with open(self.filename, 'r') as old, open(tmp, 'w') as new:
for self.last_tid, line in enumerate(old, start=self.first_tid + 1):
line = line.rstrip('\n')
self.log.debug('Renum %r to %d ...', line, self.last_tid)
try:
rec = Translog.parse_line(line)
except ValueError:
try:
tid_, rest = line.split(' ', 1)
except ValueError:
raise
try:
tid = int(tid_)
except ValueError:
tid = -1
try:
dn, command = rest.split(' ', 1)
except ValueError:
raise
rec = Transaction(tid, dn, command)
rec = rec._replace(tid=self.last_tid)
line = Translog.format_line(rec)
new.write(line)
self.run(filename=tmp)
assert not self.needs_renum
self.log.info('Renumbering done')
def remove_index(self):
# type: () -> None
"""
Remove any associated index file.
"""
raise NotImplementedError()
class CheckTranslog(CheckGeneric):
"""
Check transactions log file for consistency.
:file:`/var/lib/univention-ldap/notify/transaction`
"""
def check_transaction(self, rec):
# type: (Transaction) -> None
"""
Check committed transaction.
:param rec: The transaction record to check.
"""
if self.lnr == 1:
self.log.info("%s:%d: Starts with: %r", self.filename, self.lnr, self.line)
elif rec.tid <= self.last_tid:
self.error("Repeated line after %r", self.last_line, sort=True)
elif self.last_tid + 1 != rec.tid:
self.error("Hole after %r", self.last_line, fill=True)
def fixit(self):
# type: () -> None
"""
Fix transactions.
"""
if self.needs_syntax:
self.fix_syntax()
if self.needs_sort:
self.fix_sort()
if self.needs_fill:
self.fix_fill()
def remove_index(self):
# type: () -> None
"""
Remove `.index` file.
"""
Index(self.filename).remove()
class CheckListener(CheckGeneric):
"""
Check pending transactions log file for consistency.
:file:`/var/lib/univention-ldap/listener/listener`
"""
def check_transaction(self, rec):
# type: (Transaction) -> None
"""
Check pending transaction.
:param rec: Transaction record.
"""
if self.last_tid + 1 == rec.tid:
pass
elif rec.tid <= self.last_tid:
self.error("Repeated line %r", self.last_line, renum=True)
elif self.lnr == 1:
self.error("Not continuous with %s: %r", self.opt.translog_file, self.last_line, renum=True)
else:
self.error("Hole after %r", self.last_line, renum=True)
def fixit(self):
# type: () -> None
"""
Fix transactions.
"""
if self.needs_syntax:
self.fix_syntax()
if self.needs_renum:
self.fix_renumber()
def remove_index(self):
# type: () -> None
"""
No associated `.index` file to remove.
"""
pass
def check_last(opt, filename, last_tid):
# type: (Namespace, str, int) -> None
"""
Check last used transactions id.
:file:`/var/lib/univention-ldap/last_id`
:param opt: Command line options.
:param filename: File name.
:param last_tid: Expected transaction id.
"""
log = getLogger(__name__).getChild("CHECK")
try:
with open(filename, 'r') as last:
line = last.read()
if not line:
raise ValueError('should be %d, but is empty' % (last_tid,))
tid = int(line)
if tid != last_tid:
raise ValueError('should be %d, but is %d' % (last_tid, tid))
except (EnvironmentError, ValueError) as ex:
log.error("%s: Invalid last id: %s", filename, ex)
if opt.fix:
with open(filename, 'w') as last:
last.write('%d' % (last_tid,))
log.debug("%s: Updated to: %d", filename, last_tid)
else:
print()
print('%s needs manual fixing!' % (filename,))
print()
print(HELP_RERUN)
print(HELP_FIX)
exit(1)
def prune(opt):
# type: (Namespace) -> None
"""
Prune old transactions.
:param opt: Command line options.
"""
if opt.role not in SERVICES:
print('This command is only for UCS Master and Backups.')
exit(0)
opt.skip_services = opt.dry_run
with stopped(opt, (UDN,)):
try:
prune_file(opt)
except DryRun:
pass
prune_ldap(opt)
def prune_file(opt):
"""
Prune old transactions from file.
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("prune").getChild("file")
with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
if opt.trans < 0:
log.info('Keeping last %d transactions before %d', -opt.trans, translog.last)
opt.trans = translog.last + opt.trans + 1
log.info('Purging/keeping %d..%d..%d', translog.first, opt.trans, translog.last)
if opt.trans <= translog.first:
log.fatal('Already purged.')
raise Abort()
if opt.trans >= translog.last:
log.fatal('Nothing to purge.')
raise Abort()
assert translog.first < opt.trans < translog.last
with rewrite(opt.translog_file) as tmp, open(tmp, 'w') as new:
for rec in translog:
if rec.tid < opt.trans:
log.debug('Dropping %d', rec.tid)
continue
line = Translog.format_line(rec)
new.write(line)
if opt.dry_run:
raise DryRun()
index.remove()
def prune_ldap(opt):
"""
Prune old transactions from LDAP.
:param opt: Command line options.
"""
log = getLogger(__name__).getChild("prune").getChild("ldap")
base = opt.translog_base
scope = ldap.SCOPE_ONELEVEL
filterstr = '(reqSession=*)'
attrlist = ['1.1'] # LDAP_NO_ATTRS
page_control = SimplePagedResultsControl(True, size=1000, cookie='')
with ldapi(opt) as ld:
while True:
response = ld.search_ext(base, scope, filterstr, attrlist, serverctrls=[page_control])
rtype, rdata, rmsgid, serverctrls = ld.result3(response)
for control in serverctrls:
if control.controlType == SimplePagedResultsControl.controlType:
break
else:
log.fatal('Server ignores RFC 2696 control: %r', serverctrls)
raise Abort()
for (dn, attrs) in rdata:
ava = str2dn(dn)
for (attr, value, _flags) in ava[0]:
if attr == 'reqSession':
break
else:
continue
tid = int(value)
if tid >= opt.trans:
log.debug('Keeping %s', dn)
continue
try:
log.debug("ldap_delete(%s)", dn)
if not opt.dry_run:
ld.delete_ext_s(dn)
log.info("Deleted %s", dn)
except ldap.LDAPError as ex:
log.error("ldap_delete(%s): %s", dn, ex)
if not control.cookie:
break
page_control.cookie = control.cookie
def parse_args(args=None):
# type: (List[str]) -> Namespace
"""
Parse command line arguments.
:param args: the list of arguments to process (default: `sys.argv[1:]`)
:returns: a Namespace instance.
"""
ucr = ConfigRegistry()
ucr.load()
parser = ArgumentParser(description=__doc__)
parser.add_argument("--translog-file", "-T", metavar="FILENAME", help="Transaction file [%(default)s]", default="/var/lib/univention-ldap/notify/transaction")
parser.add_argument("--translog-ldap", "-H", metavar="URL", help="LDAP URL [%(default)s]", default="ldapi:///")
parser.add_argument("--translog-base", "-B", metavar="DN", help="LDAP base for translog [%(default)s]", default="cn=translog")
parser.add_argument("--lenient", "-l", action="store_true", help="Ignore existing entries")
parser.add_argument("--verbose", "-v", action="count", help="Increase verbosity", default=2)
parser.add_argument("--dry-run", "-n", action="store_true", help="Do not modify anything")
parser.add_argument("--datetime", "-d", help="Overwrite time-stamp for import [%(default)s]", default=strftime("%Y%m%d%H%M%SZ"))
subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands")
parser_import = subparsers.add_parser("import", help="Import transaction file into LDAP")
parser_import.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions")
import_amount = parser_import.add_argument_group("minimum", description="Amount of transactions to import")
import_amount.add_argument("--count", "-c", type=parse_int_pos, help="Minimum number of transaction [%(default)s]", default=100000)
import_amount.add_argument("--percent", "-p", type=parse_percent, help="Minimum percentage of transaction file [%(default)s%%]", default="0")
import_amount.add_argument("--size", "-s", type=parse_size, help="Minimum number of bytes from transaction file [%(default)s]", default="10M")
import_limit = parser_import.add_argument_group("limit", description="Limit transactions to process")
import_limit.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1)
import_limit.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process")
parser_import.set_defaults(func=import_all)
parser_load = subparsers.add_parser("load", help="Load specified transactions into LDAP")
parser_load.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to process")
parser_load.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions")
parser_load.set_defaults(func=load_single)
parser_lookup = subparsers.add_parser("lookup", help="Lookup transaction in file")
parser_lookup.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup")
parser_lookup.set_defaults(func=lookup)
parser_ldap = subparsers.add_parser("ldap", help="Lookup transaction in LDAP")
parser_ldap.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup")
parser_ldap.set_defaults(func=lookup_ldap)
parser_stat = subparsers.add_parser("stat", help="Show statistics")
parser_stat.set_defaults(func=show_stat)
parser_index = subparsers.add_parser("index", help="Dump index file")
parser_index.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1)
parser_index.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process")
parser_index.set_defaults(func=dump_index)
parser_reindex = subparsers.add_parser("reindex", help="Re-build index file")
parser_reindex.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services")
parser_reindex.set_defaults(func=reindex, fix=True)
parser_check = subparsers.add_parser("check", help="Check transaction files")
parser_check.add_argument("--fix", "-f", action="store_true", help="Try to fix issues")
parser_check.add_argument("--base", "-b", metavar="DN", help="LDAP base [%(default)s]", default=ucr['ldap/base'])
parser_check.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role'])
parser_check.add_argument("--listener-file", "-L", metavar="FILENAME", help="Listener file [%(default)s]", default="/var/lib/univention-ldap/listener/listener")
parser_check.add_argument("--listener-private-file", "-P", metavar="FILENAME", help="Listener private file [%(default)s]", default="/var/lib/univention-ldap/listener/listener.priv")
parser_check.add_argument("--last-file", "-I", metavar="FILENAME", help="Last_id file [%(default)s]", default="/var/lib/univention-ldap/last_id")
parser_check.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services")
parser_check.set_defaults(func=check)
parser_prune = subparsers.add_parser("prune", help="Prune transaction files")
parser_prune.add_argument("trans", metavar="TID", type=int, help="Oldest transaction number to keep (negative numbers: the number of transactions to keep)")
parser_prune.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role'])
parser_prune.set_defaults(func=prune, fix=True)
opt = parser.parse_args(args)
return opt
def parse_int_pos(string):
# type: (str) -> long
"""
Parse positive integer string.
:param string: The command line string.
:returns: The parsed integer.
:raises ArgumentTypeError: if the string is valid.
"""
try:
val = long(string)
if 0 < val:
return val
except ValueError:
pass
raise ArgumentTypeError("Positive integer required")
def parse_percent(string):
# type: (str) -> float
"""
Parse percentage string.
:param string: The command line string.
:returns: The parsed percentage.
:raises ArgumentTypeError: if the string is valid.
"""
try:
val = float(string.rstrip('%'))
if not 0.0 <= val <= 100.0:
raise ValueError()
except ValueError:
raise ArgumentTypeError("Invalid percentage")
return val / 100.0
def parse_size(string):
# type: (str) -> long
"""
Parse size string.
:param string: The command line string.
:returns: The parsed size.
:raises ArgumentTypeError: if the string is valid.
"""
suffix = string.lstrip(".0123456789")
try:
unit, = suffix.rstrip("iIbB").upper() or ' '
scale = 1L << (10 * " KMGTPE".index(unit))
except ValueError:
raise ArgumentTypeError("Invalid unit")
prefix = string[:-len(suffix)] if suffix else string
try:
value = float(prefix)
if not 0.0 <= value:
raise ValueError()
except ValueError:
raise ArgumentTypeError("Invalid value")
return long(value * scale)
if __name__ == "__main__":
exit(main())