#!/usr/bin/python2.7
# vim: set fileencoding=utf-8 filetype=python foldmethod=expr expandtab shiftwidth=4 tabstop=4 :
# Copyright 2019-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""Univention Directory Notifier Transaction log admin commmand"""

from __future__ import print_function

from os import SEEK_SET, fstat, rename, unlink, environ
from os.path import exists
from sys import stderr
from argparse import ArgumentParser, ArgumentTypeError
from collections import namedtuple
from logging import basicConfig, getLogger, CRITICAL, DEBUG
from time import strftime, time
from ctypes import c_bool, c_ulong, sizeof, Structure
from errno import ENOENT
from contextlib import contextmanager
from itertools import chain
from subprocess import call

import ldap
from ldap.dn import is_dn, str2dn
from ldap.ldapobject import ReconnectLDAPObject
from ldap.controls import SimplePagedResultsControl
from ldap.modlist import addModlist
from univention.config_registry import ConfigRegistry

try:
    from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Type, NamedTuple  # noqa F401
    from types import TracebackType  # noqa F401
    from argparse import Namespace  # noqa F401
    Transaction = NamedTuple("Transaction", [("tid", int), ("dn", str), ("command", str)])
except ImportError:
    Transaction = namedtuple("Transaction", ["tid", "dn", "command"])  # type: ignore

LOG_FORMAT = "%(asctime)-15s:%(levelname)s:%(message)s"
BSIZE = 4096
HELP_RESET = 'See <https://help.univention.com/t/how-to-reset-listener-notifier-replication/11710> for more details.'
HELP_RERUN = 'You can re-run this tool with the option "--fix" in order to try to fix this issue.'
HELP_FIX = 'See <https://help.univention.com/t/problem-umc-diagnostic-module-complains-about-problems-with-udn-replication/11707/1> for more details. '
SLD = 'slapd.service'
UDN = 'univention-directory-notifier.service'
UDL = 'univention-directory-listener.service'
SERVICES = {
    'domaincontroller_master': (SLD, UDN, UDL),
    'domaincontroller_backup': (UDL, SLD, UDN),
}  # type: Dict[str, Tuple[str, ...]]


class IndexHeader(Structure):
    """
    Header for index file.

    .. warning::

        The header is architecture dependant due to the use of `c_ulong`.
    """
    MAGIC = 0x3395e0d4L
    _fields_ = [("magic", c_ulong)]
    _pack_ = 1


class IndexEntry(Structure):
    """
    Entry of index file.

    .. warning::

        The header is architecture dependant due to the use of `c_ulong`.

    .. warning::

        The source is compiled with `-D_FILE_OFFSET_BITS=64` which makes `off_t` 64 bit on i386, too, but `ulong` remains 32 bit.
    """
    _fields_ = [("valid", c_bool), ("offset", c_ulong)]
    _pack_ = 1


class Index(object):
    """
    Index to efficiently lookup transactions in the translog file.
    :file:`/var/lib/univention-ldap/notify/transaction.index`
    """

    def __init__(self, filename):
        # type: (str) -> None
        self.filename = filename + ".index"
        self.log = getLogger(__name__).getChild("Index")
        self.index = None  # type: Optional[BinaryIO]
        self.size = 0
        self.count = 0

    def __enter__(self):
        # type: () -> Index
        try:
            self.index = index = open(self.filename, 'rb')

            data = index.read(sizeof(IndexHeader))
            assert data, "Empty index"
            header = IndexHeader.from_buffer_copy(data)
            assert header.magic == header.MAGIC, header.magic

            stat = fstat(index.fileno())
            self.size = size = stat.st_size
        except EnvironmentError as ex:
            self.log.warning("Failed to open %s: %s", self.filename, ex)
            if ex.errno != ENOENT:
                raise

            self.log.info("Creating empty %s", self.filename)
            self.index = index = open(self.filename, 'wb+')

            header = IndexHeader(IndexHeader.MAGIC)
            data = buffer(header)[:]  # type: ignore
            self.index.write(data)

            self.size = size = len(data)

        count, reminder = divmod(size - sizeof(IndexHeader), sizeof(IndexEntry))
        self.log.info("Index of size %d contains %d entries", size, count)
        self.count = count - 1 if count else 0  # transaction 0 is never used
        assert reminder == 0, reminder

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
        assert self.index
        self.index.close()

    def __getitem__(self, tid):
        # type: (int) -> IndexEntry
        """
        Return index entry for given transaction.

        :param tid: Transaction id.
        :returns: The index entry.
        :raises IndexEntry: for transactions before 1 or after the current count.
        """
        if tid < 1 or tid > self.count:
            raise IndexError(tid)

        assert self.index
        self.seek(tid)

        data = self.index.read(sizeof(IndexEntry))
        assert data, "Invalid index entry"
        entry = IndexEntry.from_buffer_copy(data)
        self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset)

        return entry

    def __setitem__(self, tid, offset):
        # type: (int, int) -> None
        """
        Set index entry for given transaction.

        :param tid: Transaction id.
        :param offset: File offset.
        :raises IndexEntry: for transactions before 1.
        """
        if tid < 1:
            raise IndexError(tid)

        if tid > self.count:
            self.count = tid

        assert self.index
        self.seek(tid)

        entry = IndexEntry(True, offset)
        data = buffer(entry)[:]  # type: ignore
        self.index.write(data)
        self.log.debug("Transaction %d is %s and at position %d", tid, entry.valid, entry.offset)

    def seek(self, tid):
        # type: (int) -> None
        """
        Seek to given transaction id.

        :param tid: Transaction id.
        """
        assert 1 <= tid <= self.count
        assert self.index

        pos = sizeof(IndexHeader) + tid * sizeof(IndexEntry)
        self.log.debug("Looking up transaction %d at position %d in index", tid, pos)
        self.index.seek(pos, SEEK_SET)

    def remove(self):
        # type: () -> None
        """
        Remove `.index` file.
        """
        self.log.info('Removing %s ...', self.filename)
        try:
            unlink(self.filename)
        except EnvironmentError as ex:
            self.log.debug('unlink %s: %s', self.filename, ex)
            if ex.errno != ENOENT:
                raise


class Translog(object):
    """
    Transactions log file.
    :file:`/var/lib/univention-ldap/notify/transaction`
    """

    def __init__(self, filename, index=None):
        # type: (str, Optional[Index]) -> None
        self.filename = filename
        self.index = index or Index(filename).__enter__()  # type: Index
        self.log = getLogger(__name__).getChild("Log")
        self.translog = None  # type: Optional[BinaryIO]
        self.size = 0
        self.first = 0
        self.last = 0

    @property
    def count(self):
        # type: () -> int
        """
        Return count of transactions.
        """
        if self.last == 0:
            return 0
        return self.last - self.first + 1

    @staticmethod
    def parse_line(line):
        # type: (str) -> Transaction
        """
        Parse line from transaction line.

        :param line: One transaction line
        :returns: 3-tuples (transaction_number, distinguished_name, command)
        """
        line = line.strip()
        tid, rest = line.split(' ', 1)
        dn, command = rest.rsplit(' ', 1)
        return Transaction(int(tid), dn, command)

    @staticmethod
    def format_line(rec):
        # type: (Transaction) -> str
        return '%d %s %s\n' % rec

    def __enter__(self):
        # type: () -> Translog
        self.translog = translog = open(self.filename, 'r')

        stat = fstat(translog.fileno())
        self.size = size = stat.st_size

        if size:
            line = translog.readline()
            rec = self.parse_line(line)
            assert 1 <= rec.tid, rec.tid
            self.first = rec.tid

            last = self.index.count
            if last:
                self.log.debug("Seeking to last transaction %d...", last)
                self.seek(last)
            else:
                offset = 0 if size < BSIZE else size - BSIZE
                self.read(offset)

            for line in self.translog:
                self.log.debug("Read line %r", line)
            rec = self.parse_line(line)
            assert last <= rec.tid, (last, rec.tid)
            if last < rec.tid:
                self.log.warn("Index=%d < translog=%d entries", last, rec.tid)
            self.last = rec.tid

            self.translog.seek(0, SEEK_SET)

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
        assert self.translog
        self.translog.close()

    def __getitem__(self, tid):
        # type: (int) -> Transaction
        """
        Return given transaction.

        :param tid: Transaction id.
        :returns: The transaction.
        :raises IndexEntry: for transactions before 1 or after the current count.
        """
        assert self.index
        if tid < 1 or tid > self.index.count:
            raise IndexError(tid)

        assert self.translog
        self.seek(tid)

        line = self.translog.readline()
        self.log.debug("Read line %r", line)

        rec = self.parse_line(line)
        assert tid == rec.tid, rec.tid

        return rec

    def __iter__(self):
        # type: () -> Iterator[Transaction]
        """
        Iterate over all transactions.
        """
        assert self.translog
        for line in self.translog:
            self.log.debug("Read line %r", line)
            rec = self.parse_line(line)
            yield rec

    def seek(self, tid):
        # type: (int) -> int
        """
        Seek to given transaction id.

        :param tid: Transaction id.
        :returns: Offset.
        """
        assert self.index
        assert self.first <= tid <= self.index.count

        rec = self.index[tid]
        if not rec.valid:
            self.log.warn("Transaction %d is invalid", tid)
            raise IndexError(tid)
        pos = rec.offset
        self.log.debug("Seeking to transaction %d at position %d", tid, pos)
        assert 0 <= pos < self.size, pos
        assert self.translog
        self.translog.seek(pos, SEEK_SET)
        return pos

    def read(self, offset):
        # type: (int) -> Transaction
        """
        Read next transaction after given offset.

        :param offset: absolute file offset.
        :returns: The transaction.
        :raises EOFError: if reading past the end.
        """
        assert 0 <= offset <= self.size, offset
        if offset >= self.size:
            raise EOFError()

        assert self.translog
        pos = max(0, offset - BSIZE)
        self.translog.seek(pos, SEEK_SET)
        data = self.translog.read(offset - pos)
        self.log.debug("Read from %d: %r", pos, data)

        before = data.rsplit('\n', 1)[-1]
        after = self.translog.readline()
        self.log.debug("Read line %r %r", before, after)

        line = before + after
        rec = self.parse_line(line)

        return rec


class Abort(Exception):
    """
    Fatal abort.
    """


class DryRun(Exception):
    """
    Abort dry-run mode.
    """


def main():
    # type: () -> int
    """
    Work with transaction log in LDAP server.
    """
    opt = parse_args()

    basicConfig(stream=stderr, level=max(DEBUG, CRITICAL - DEBUG * opt.verbose), format=LOG_FORMAT)

    try:
        return opt.func(opt) or 0
    except Abort:
        return 1


@contextmanager
def ldapi(opt):
    # type: (Namespace) -> Iterator[ReconnectLDAPObject]
    """
    Return local LDAP connection.

    :param opt: Command line options.
    :returns: A initialized LDAP connection.
    """
    log = getLogger(__name__).getChild("ldap")

    log.debug("ldap_set_option(PROTOCOL_VERSION)")
    ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3)
    log.debug("ldap_initialize(%s)", opt.translog_ldap)
    ld = ReconnectLDAPObject(
        opt.translog_ldap,
        trace_level=max(0, opt.verbose - 4),
        retry_max=3,
        retry_delay=10.0,
    )
    log.debug("ldap_bind()")
    ld.sasl_external_bind_s()

    yield ld

    log.debug("ldap_unbind()")
    ld.unbind_ext_s()


def import_all(opt):
    # type: (Namespace) -> None
    """
    Load transaction from file into LDAP server.

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("import")
    log.info("Reading transactions from file '%s'...", opt.translog_file)

    with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld:
        if not translog.size:
            log.debug("File is empty; no transactions to process.")
            return

        c_tid = max(1, translog.last - opt.count + 1)
        log.debug("count.tid=%d count=%d", c_tid, opt.count)

        p_offset = int(translog.size * (1.0 - opt.percent))
        try:
            p_trans = translog.read(p_offset)
            p_tid = p_trans.tid
        except EOFError:
            p_tid = translog.last
        log.debug("percent.tid=%d off=%d percent=%f", p_tid, p_offset, opt.percent)

        s_offset = max(0, translog.size - opt.size)
        try:
            s_trans = translog.read(s_offset)
            s_tid = s_trans.tid
        except EOFError:
            s_tid = translog.last
        log.debug("size.tid=%d off=%d size=%d", s_tid, s_offset, opt.size)

        start = max(1, min(c_tid, p_tid, s_tid), translog.first, opt.min)
        assert translog.first <= start <= translog.last, start
        end = min(opt.max or translog.last, translog.last)

        if opt.index and index.count:
            log.info("Processing transactions %d .. %d", start, end)
            translog.seek(start)
            lead = []  # type: List[Transaction]
        else:
            offset = min(p_offset, s_offset)
            log.info("Processing transaction from offset %d .. %d", offset, end)
            rec = translog.read(offset)
            lead = [rec]

        t_begin = t_last = time()
        count = end - start
        for rec in chain(lead, translog):
            if rec.tid < start:
                continue
            if rec.tid > end:
                break

            add_transaction(opt, ld, rec)

            if stderr.isatty():
                t_now = time()
                if t_now - t_last >= 5.0:
                    p_done = float(rec.tid - start) / count
                    t_used = t_now - t_begin
                    stderr.write('\rprocessed {:d} of {:d} [{:.1f}%] in {:.1f}s, {:d} remaining in {:.1f}s  \r'.format(
                        rec.tid - start,
                        count,
                        p_done * 100.0,
                        t_used,
                        end - rec.tid,
                        t_used / p_done - t_used,
                    ))
                    t_last = t_now


def load_single(opt):
    # type: (Namespace) -> None
    """
    Load given transactions into LDAP server.

    :param opt: Command line options.
    """
    with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog, ldapi(opt) as ld:
        for tid in opt.trans:
            try:
                rec = translog[tid]
            except IndexError:
                continue

            add_transaction(opt, ld, rec)


def show_stat(opt):
    # type: (Namespace) -> None
    """
    Show statistics

    :param opt: Command line options.
    """
    with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
        print("Index.file: %s" % index.filename)
        print("Index.size: %d" % index.size)
        print("Index.count: %d" % index.count)
        print("Index.okay: %s" % ('yes' if index.count == translog.last else 'no',))
        print("Translog.file: %s" % translog.filename)
        print("Translog.size: %d" % translog.size)
        print("Translog.first: %d" % translog.first)
        print("Translog.last: %d" % translog.last)
        print("Translog.count: %d" % translog.count)
        first, last = translog.first, translog.last

    tid = last
    with ldapi(opt) as ld:
        while tid >= first:
            dn = "reqSession={},{}".format(tid, opt.translog_base)
            try:
                ld.search_ext_s(dn, ldap.SCOPE_BASE)
                break
            except ldap.NO_SUCH_OBJECT:
                tid -= 1
    print("Ldap.last: %d" % tid)
    print("Ldap.okay: %s" % ('yes' if last == tid else 'no',))


def dump_index(opt):
    # type: (Namespace) -> None
    """
    Dump transaction index

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("INDEX")

    with Index(opt.translog_file) as index:
        begin = opt.min
        end = min(opt.max or index.count, index.count)
        log.debug('Dumping index from [%d, %d] ...', begin, end)
        for tid in xrange(begin, end + 1):
            rec = index[tid]
            print('%8d[%c]: %d' % (tid, 'x' if rec.valid else ' ', rec.offset))


def reindex(opt):
    # type: (Namespace) -> None
    """
    Re-build transaction index

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("REINDEX")

    index = Index(opt.translog_file)
    log.warning("Ignore the following 2 WARNINGS about the missing and mismatching index")
    with stopped(opt, (UDN,)), rewrite(index.filename) as index.filename:
        with index, Translog(opt.translog_file, index) as translog:
            log.warning("Re-building index %s now...", index.filename)
            assert translog.translog is not None
            pos = 0
            for lnr, line in enumerate(translog.translog, start=1):
                rec = translog.parse_line(line)
                log.info("%d: tid=%d @ %d", lnr, rec.tid, pos)
                index[rec.tid] = pos
                pos += len(line.encode())


def lookup(opt):
    # type: (Namespace) -> None
    """
    Lookup transactions.

    :param opt: Command line options.
    """
    with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
        for tid in opt.trans:
            print(translog[tid])


def lookup_ldap(opt):
    # type: (Namespace) -> None
    """
    Check transactions.

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("LDAP")

    with ldapi(opt) as ld:
        for tid in opt.trans:
            dn = "reqSession={},{}".format(tid, opt.translog_base)
            try:
                result = ld.search_ext_s(dn, ldap.SCOPE_BASE)
            except ldap.NO_SUCH_OBJECT as ex:
                if not opt.lenient:
                    log.critical("ldap_search(%s): %s", dn, ex.args[0]["desc"])
                    raise Abort()
            ((dn, attrs),) = result
            print('tid={0[reqSession][0]} dn={0[reqDN][0]} command={0[reqType][0]}'.format(attrs))


def add_transaction(opt, ld, rec):
    # type: (Namespace, ReconnectLDAPObject, Transaction) -> None
    """
    Load single transaction into LDAP server.

    :param opt: Command line options.
    :param ld: LDAP server connection.
    :param rec: Transaction to add.
    :raises Abort: on fatal errors.
    """
    log = getLogger(__name__).getChild("LDAP")

    dn = "reqSession={.tid},{}".format(rec, opt.translog_base)
    modlist = addModlist({
        "objectClass": ["auditObject"],
        "reqStart": [opt.datetime],
        "reqType": ["{.command}".format(rec)],
        "reqSession": ["{.tid}".format(rec)],
        "reqDN": ["{.dn}".format(rec)],
    })
    try:
        log.debug("ldap_add(%s)", dn)
        if not opt.dry_run:
            ld.add_ext_s(dn, modlist)
    except (ldap.ALREADY_EXISTS, ldap.INVALID_SYNTAX) as ex:
        if not opt.lenient:
            log.critical("ldap_add(%s): %s", rec, ex.args[0]["desc"])
            raise Abort()
        log.error("ldap_add(%s): %s", rec, ex.args[0]["desc"])
    except ldap.LDAPError as ex:
        log.critical("ldap_add(%s): %s", rec, ex)
        raise Abort()


def check(opt):
    # type: (Namespace) -> None
    """
    Check transaction files for consistency.

    :param opt: Command line options.
    """
    if opt.role not in SERVICES:
        print('This command is only for UCS Master and Backups.')
        exit(0)

    with stopped(opt, SERVICES[opt.role]):
        translog = CheckTranslog(opt, opt.translog_file)
        translog.run()

        listener = CheckListener(opt, opt.listener_file)

        if exists(opt.listener_private_file):
            listener_priv = CheckListener(opt, opt.listener_private_file)
            listener_priv.first_tid = translog.last_tid
            listener_priv.first_line = translog.last_line
            listener_priv.run()

            listener.first_tid = listener_priv.last_tid
            listener.first_line = listener_priv.last_line
        else:
            listener.first_tid = translog.last_tid
            listener.first_line = translog.last_line

        listener.run()

        if opt.role == 'domaincontroller_master':
            check_last(opt, opt.last_file, listener.last_tid)

        # TODO: check cn=translog (Bug #49225)


@contextmanager
def stopped(opt, services):
    # type: (Namespace, Tuple[str, ...]) -> Iterator
    """
    Stop services while in context.

    :param opt: Command line options.
    :param services: Sequence of service names.
    """
    log = getLogger(__name__).getChild("SERVICE")

    if opt.skip_services or not opt.fix:
        yield
        return

    log.info("Stopping %s", services)
    cmd = ('systemctl', 'stop') + services
    call(cmd)

    try:
        yield
    except Exception:
        raise
    else:
        log.info("Starting %s", services)
        cmd = ('systemctl', 'start') + services
        call(cmd)


@contextmanager
def rewrite(filename):
    # type: (str) -> Iterator[str]
    """
    Return name of temporary file to replace given file on success.

    :param filename: Name of file to replace on success.
    :returns: Name of temporary file.
    """
    log = getLogger(__name__).getChild("rewrite")
    cur = filename
    new = filename + '.new'
    bak = filename + '.bak'
    try:
        yield new
    except Exception as ex:
        log.debug('abort: %s', ex)
        if exists(new):
            try:
                unlink(new)
            except EnvironmentError as ex:
                log.debug('unlink %s: %s', new, ex)
                if ex.errno != ENOENT:
                    raise
        raise
    else:
        log.debug('renaming %s %s', cur, bak)
        try:
            rename(cur, bak)
        except EnvironmentError as ex:
            log.debug('rename %s %s: %s', cur, bak)
            if ex.errno != ENOENT:
                raise

        log.debug('renaming %s %s', new, cur)
        rename(new, cur)


class CheckGeneric(object):
    """
    Check transaction file for consistency.
    """

    def __init__(self, opt, filename):
        # type: (Namespace, str) -> None
        """
        Check transaction file for consistency.

        :param opt: Command line options.
        :param filename: The name of the file containing transaction lines.
        """
        self.opt = opt
        self.filename = filename
        self.log = getLogger(__name__).getChild("CHECK").getChild(self.__class__.__name__)
        self.first_tid = 0
        self.first_line = ''
        self.reset()

    def reset(self):
        # type: () -> None
        """
        Reset internal state.
        """
        self.needs_fixing = False
        self.needs_sort = False
        self.needs_fill = False
        self.needs_renum = False
        self.needs_syntax = False
        self.lnr = 0
        self.line = ''
        self.last_line = self.first_line
        self.last_tid = self.first_tid

    def error(self, string, *args, **kwargs):
        # type: (str, *Any, **bool) -> None
        """
        Flag error in file.

        :param string: A descriptive format string, which is formatted using any additional positional arguments.
        :param bool fix: The error must be fixed.
        :param bool sort: The error can be fixed by sorting the transactions.
        :param bool fill: The error can be fixed by filling the hole with dummy entries.
        :param bool renum: The error can be fixed by re-numbering all transactions.
        :param bool syntax: The line contains syntax error needing manual fixing.
        """
        self.log.error('%s:%d:%r: ' + string, self.filename, self.lnr, self.line, *args)
        self.needs_fixing |= kwargs.get('fix', True)
        self.needs_sort |= kwargs.get('sort', False)
        self.needs_fill |= kwargs.get('fill', False)
        self.needs_renum |= kwargs.get('renum', False)
        self.needs_syntax |= kwargs.get('syntax', False)

    def run(self, filename=''):
        # type: (str) -> None
        """
        Run check on trasnaction file.

        :param filename: The name of the transaction file. Defaults to :py:attribute:`filename`.
        """
        self.reset()
        with open(filename or self.filename, 'r') as stream:
            self.log.info("Reading %s ...", filename or self.filename)
            for lnr, line, rec in self.loop(stream):
                self.check_transaction(rec)
            self.log.info("Reading %s done", filename or self.filename)

        if self.needs_fixing and filename == '':
            print()
            print('%s needs fixing:' % (self.filename,))
            if self.opt.role != 'domaincontroller_master':
                print('This UCS system must be re-joined!')
                print()
                print(HELP_FIX)
                exit(1)

            self.fixit()

    def loop(self, stream):
        # type: (BinaryIO) -> Iterator[Tuple[int, str, Transaction]]
        """
        Loop over all lines in the stream.

        :param stream: The input stream.
        :returns: yields 3-tuples (line-number, original-line-content, parsed-transaction-record).
        """
        for self.lnr, line in enumerate(stream, start=1):
            self.line = line
            try:
                if line.endswith('\n'):
                    line = line[:-1]
                else:
                    self.error("Missing newline line", fix=False)

                stripped = line.strip()
                if line != stripped:
                    self.error("Extra whitespace", fix=False)
                    line = stripped
                stripped = line.strip('\x00')
                if line != stripped:
                    self.error("Binary zeros", fix=False)
                    line = stripped

                try:
                    rec = Translog.parse_line(line)
                except ValueError:
                    # Ok let's try to identify the problem
                    try:
                        tid_, rest = line.split(' ', 1)
                    except ValueError:
                        self.error("Invalid line", syntax=True)
                        continue
                    try:
                        tid = int(tid_)
                    except ValueError:
                        tid = -1
                    try:
                        dn, command = rest.split(' ', 1)
                    except ValueError:
                        self.error("Invalid line", syntax=True)
                        dn, command = rest, ''

                    rec = Transaction(tid, dn, command)

                if not 0 < rec.tid:
                    self.error("Invalid transaction id", renum=True)
                    continue

                if rec.command not in {'r', 'd', 'a', 'm'}:  # mod_Rdn Delete Add Modify
                    self.error("Invalid command")
                    # if self.opt.fix:
                    #     rec = rec._replace(command='m')

                if not is_dn(rec.dn):
                    self.error("Invalid dn")
                    # if self.opt.fix:
                    #     rec = rec._replace(dn=self.opt.base)
                elif not rec.dn.endswith(self.opt.base):
                    self.error("Foreign dn", fix=False)

                yield (self.lnr, line, rec)

                self.last_tid = rec.tid
            finally:
                self.last_line = line

    def check_transaction(self, rec):
        # type: (Transaction) -> None
        """
        Check transaction for consistency.

        :param rec: The transaction record to check.
        """
        raise NotImplementedError()

    def fixit(self):
        # type: () -> None
        """
        Fix transactions.
        """
        raise NotImplementedError()

    def fix_syntax(self):
        # type: () -> None
        # TODO: Loop over lines and
        # - strip binary zeros
        # - add missing command in 3rd column
        # - optional: fake invalid DNs in 2nd column
        # - optional: fake transaction number
        # see commented out code in :py:meth:`loop`.
        """
        Handle syntax errors. TODO
        """
        self.log.info('Syntax ...')

        print('- contains unparseable lines!')
        print()
        print(HELP_RESET)
        exit(1)

    def fix_sort(self):
        # type: () -> None
        """
        Handle transactions not being sorted strong monotonically increasing.
        """
        self.log.info('Sorting ...')
        print('- the transactions are not sorted uniquely')

        with rewrite(self.filename) as tmp:
            cmd = (
                'sort',
                '--ignore-leading-blanks',
                '--ignore-nonprinting',
                '--key', '1n',  # transaction ID
                '--key', '2',  # DN
                '--key', '3r',  # command 'r', 'm', 'd', 'a'
                '--unique',
                '--output', tmp,
                self.filename,
            )
            env = dict(environ)
            env['LC_ALL'] = 'C.UTF-8'
            self.log.debug('Running %r ...', cmd)
            if call(cmd, env=env):
                print('- failed to sort file!')
                print('Check available space of filesystem?')
                exit(2)

            self.run(filename=tmp)

            if self.needs_sort:
                print('- still contains duplicate transactions after unique sorting!')
                print('UCS Master must be reset and all other UCS systems must be re-joined!')
                print()
                print(HELP_RESET)
                exit(1)

            if not self.opt.fix:
                print()
                print(HELP_RERUN)
                print(HELP_FIX)
                exit(1)

            self.remove_index()
        self.log.info('Sorting done')

    def fix_fill(self):
        # type: () -> None
        """
        Handle transactions not being consecutive.
        """
        self.log.info('Filling holes ...')
        print('- missing transactions in sequence')

        if not self.opt.fix:
            print()
            print(HELP_RERUN)
            print(HELP_FIX)
            exit(1)

        with rewrite(self.filename) as tmp:
            with open(self.filename, 'r') as old, open(tmp, 'w') as new:
                next_tid = 0
                for line in old:
                    try:
                        rec = Translog.parse_line(line)
                        assert next_tid == 0 or next_tid <= rec.tid, (next_tid, rec.tid)
                        while next_tid != 0 and next_tid < rec.tid:
                            self.log.info('Filling %d ...', next_tid)
                            fill_rec = Transaction(tid=next_tid, dn=self.opt.base, command='m')
                            fill_line = Translog.format_line(fill_rec)
                            new.write(fill_line)
                            next_tid += 1

                        next_tid = rec.tid + 1
                    except ValueError:
                        pass
                    new.write(line)

            self.run(filename=tmp)

            assert not self.needs_fill

            self.remove_index()
        self.log.info('Filling holes done')

    def fix_renumber(self):
        # type: () -> None
        """
        Handle transactions being out-of-order.
        """
        self.log.info('Renumbering ...')
        print('- pending transactions are not consecutive')

        if not self.opt.fix:
            print()
            print(HELP_RERUN)
            print(HELP_FIX)
            exit(1)

        with rewrite(self.filename) as tmp:
            with open(self.filename, 'r') as old, open(tmp, 'w') as new:
                for self.last_tid, line in enumerate(old, start=self.first_tid + 1):
                    line = line.rstrip('\n')
                    self.log.debug('Renum %r to %d ...', line, self.last_tid)
                    try:
                        rec = Translog.parse_line(line)
                    except ValueError:
                        try:
                            tid_, rest = line.split(' ', 1)
                        except ValueError:
                            raise
                        try:
                            tid = int(tid_)
                        except ValueError:
                            tid = -1
                        try:
                            dn, command = rest.split(' ', 1)
                        except ValueError:
                            raise
                        rec = Transaction(tid, dn, command)

                    rec = rec._replace(tid=self.last_tid)
                    line = Translog.format_line(rec)
                    new.write(line)

            self.run(filename=tmp)

            assert not self.needs_renum
        self.log.info('Renumbering done')

    def remove_index(self):
        # type: () -> None
        """
        Remove any associated index file.
        """
        raise NotImplementedError()


class CheckTranslog(CheckGeneric):
    """
    Check transactions log file for consistency.
    :file:`/var/lib/univention-ldap/notify/transaction`
    """

    def check_transaction(self, rec):
        # type: (Transaction) -> None
        """
        Check committed transaction.

        :param rec: The transaction record to check.
        """
        if self.lnr == 1:
            self.log.info("%s:%d: Starts with: %r", self.filename, self.lnr, self.line)
        elif rec.tid <= self.last_tid:
            self.error("Repeated line after %r", self.last_line, sort=True)
        elif self.last_tid + 1 != rec.tid:
            self.error("Hole after %r", self.last_line, fill=True)

    def fixit(self):
        # type: () -> None
        """
        Fix transactions.
        """
        if self.needs_syntax:
            self.fix_syntax()
        if self.needs_sort:
            self.fix_sort()
        if self.needs_fill:
            self.fix_fill()

    def remove_index(self):
        # type: () -> None
        """
        Remove `.index` file.
        """
        Index(self.filename).remove()


class CheckListener(CheckGeneric):
    """
    Check pending transactions log file for consistency.
    :file:`/var/lib/univention-ldap/listener/listener`
    """

    def check_transaction(self, rec):
        # type: (Transaction) -> None
        """
        Check pending transaction.

        :param rec: Transaction record.
        """
        if self.last_tid + 1 == rec.tid:
            pass
        elif rec.tid <= self.last_tid:
            self.error("Repeated line %r", self.last_line, renum=True)
        elif self.lnr == 1:
            self.error("Not continuous with %s: %r", self.opt.translog_file, self.last_line, renum=True)
        else:
            self.error("Hole after %r", self.last_line, renum=True)

    def fixit(self):
        # type: () -> None
        """
        Fix transactions.
        """
        if self.needs_syntax:
            self.fix_syntax()
        if self.needs_renum:
            self.fix_renumber()

    def remove_index(self):
        # type: () -> None
        """
        No associated `.index` file to remove.
        """
        pass


def check_last(opt, filename, last_tid):
    # type: (Namespace, str, int) -> None
    """
    Check last used transactions id.
    :file:`/var/lib/univention-ldap/last_id`

    :param opt: Command line options.
    :param filename: File name.
    :param last_tid: Expected transaction id.
    """
    log = getLogger(__name__).getChild("CHECK")

    try:
        with open(filename, 'r') as last:
            line = last.read()
        if not line:
            raise ValueError('should be %d, but is empty' % (last_tid,))
        tid = int(line)
        if tid != last_tid:
            raise ValueError('should be %d, but is %d' % (last_tid, tid))
    except (EnvironmentError, ValueError) as ex:
        log.error("%s: Invalid last id: %s", filename, ex)
        if opt.fix:
            with open(filename, 'w') as last:
                last.write('%d' % (last_tid,))
            log.debug("%s: Updated to: %d", filename, last_tid)
        else:
            print()
            print('%s needs manual fixing!' % (filename,))
            print()
            print(HELP_RERUN)
            print(HELP_FIX)
            exit(1)


def prune(opt):
    # type: (Namespace) -> None
    """
    Prune old transactions.

    :param opt: Command line options.
    """
    if opt.role not in SERVICES:
        print('This command is only for UCS Master and Backups.')
        exit(0)

    opt.skip_services = opt.dry_run
    with stopped(opt, (UDN,)):
        try:
            prune_file(opt)
        except DryRun:
            pass
        prune_ldap(opt)


def prune_file(opt):
    """
    Prune old transactions from file.

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("prune").getChild("file")

    with Index(opt.translog_file) as index, Translog(opt.translog_file, index) as translog:
        if opt.trans < 0:
            log.info('Keeping last %d transactions before %d', -opt.trans, translog.last)
            opt.trans = translog.last + opt.trans + 1

        log.info('Purging/keeping %d..%d..%d', translog.first, opt.trans, translog.last)
        if opt.trans <= translog.first:
            log.fatal('Already purged.')
            raise Abort()
        if opt.trans >= translog.last:
            log.fatal('Nothing to purge.')
            raise Abort()
        assert translog.first < opt.trans < translog.last

        with rewrite(opt.translog_file) as tmp, open(tmp, 'w') as new:
            for rec in translog:
                if rec.tid < opt.trans:
                    log.debug('Dropping %d', rec.tid)
                    continue
                line = Translog.format_line(rec)
                new.write(line)

            if opt.dry_run:
                raise DryRun()

        index.remove()


def prune_ldap(opt):
    """
    Prune old transactions from LDAP.

    :param opt: Command line options.
    """
    log = getLogger(__name__).getChild("prune").getChild("ldap")

    base = opt.translog_base
    scope = ldap.SCOPE_ONELEVEL
    filterstr = '(reqSession=*)'
    attrlist = ['1.1']  # LDAP_NO_ATTRS
    page_control = SimplePagedResultsControl(True, size=1000, cookie='')
    with ldapi(opt) as ld:
        while True:
            response = ld.search_ext(base, scope, filterstr, attrlist, serverctrls=[page_control])
            rtype, rdata, rmsgid, serverctrls = ld.result3(response)

            for control in serverctrls:
                if control.controlType == SimplePagedResultsControl.controlType:
                    break
            else:
                log.fatal('Server ignores RFC 2696 control: %r', serverctrls)
                raise Abort()

            for (dn, attrs) in rdata:
                ava = str2dn(dn)
                for (attr, value, _flags) in ava[0]:
                    if attr == 'reqSession':
                        break
                else:
                    continue
                tid = int(value)
                if tid >= opt.trans:
                    log.debug('Keeping %s', dn)
                    continue
                try:
                    log.debug("ldap_delete(%s)", dn)
                    if not opt.dry_run:
                        ld.delete_ext_s(dn)
                    log.info("Deleted %s", dn)
                except ldap.LDAPError as ex:
                    log.error("ldap_delete(%s): %s", dn, ex)

            if not control.cookie:
                break
            page_control.cookie = control.cookie


def parse_args(args=None):
    # type: (List[str]) -> Namespace
    """
    Parse command line arguments.

    :param args: the list of arguments to process (default: `sys.argv[1:]`)
    :returns: a Namespace instance.
    """
    ucr = ConfigRegistry()
    ucr.load()

    parser = ArgumentParser(description=__doc__)
    parser.add_argument("--translog-file", "-T", metavar="FILENAME", help="Transaction file [%(default)s]", default="/var/lib/univention-ldap/notify/transaction")
    parser.add_argument("--translog-ldap", "-H", metavar="URL", help="LDAP URL [%(default)s]", default="ldapi:///")
    parser.add_argument("--translog-base", "-B", metavar="DN", help="LDAP base for translog [%(default)s]", default="cn=translog")
    parser.add_argument("--lenient", "-l", action="store_true", help="Ignore existing entries")
    parser.add_argument("--verbose", "-v", action="count", help="Increase verbosity", default=2)
    parser.add_argument("--dry-run", "-n", action="store_true", help="Do not modify anything")
    parser.add_argument("--datetime", "-d", help="Overwrite time-stamp for import [%(default)s]", default=strftime("%Y%m%d%H%M%SZ"))
    subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands")

    parser_import = subparsers.add_parser("import", help="Import transaction file into LDAP")
    parser_import.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions")
    import_amount = parser_import.add_argument_group("minimum", description="Amount of transactions to import")
    import_amount.add_argument("--count", "-c", type=parse_int_pos, help="Minimum number of transaction [%(default)s]", default=100000)
    import_amount.add_argument("--percent", "-p", type=parse_percent, help="Minimum percentage of transaction file [%(default)s%%]", default="0")
    import_amount.add_argument("--size", "-s", type=parse_size, help="Minimum number of bytes from transaction file [%(default)s]", default="10M")
    import_limit = parser_import.add_argument_group("limit", description="Limit transactions to process")
    import_limit.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1)
    import_limit.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process")
    parser_import.set_defaults(func=import_all)

    parser_load = subparsers.add_parser("load", help="Load specified transactions into LDAP")
    parser_load.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to process")
    parser_load.add_argument("--index", "-i", action="store_false", help="Do not use index to find transactions")
    parser_load.set_defaults(func=load_single)

    parser_lookup = subparsers.add_parser("lookup", help="Lookup transaction in file")
    parser_lookup.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup")
    parser_lookup.set_defaults(func=lookup)

    parser_ldap = subparsers.add_parser("ldap", help="Lookup transaction in LDAP")
    parser_ldap.add_argument("trans", metavar="TID", type=parse_int_pos, nargs='+', help="Transaction number to lookup")
    parser_ldap.set_defaults(func=lookup_ldap)

    parser_stat = subparsers.add_parser("stat", help="Show statistics")
    parser_stat.set_defaults(func=show_stat)

    parser_index = subparsers.add_parser("index", help="Dump index file")
    parser_index.add_argument("--min", "-m", metavar="TID", type=parse_int_pos, help="First transaction ID to process [%(default)s]", default=1)
    parser_index.add_argument("--max", "-M", metavar="TID", type=parse_int_pos, help="Last transaction ID to process")
    parser_index.set_defaults(func=dump_index)

    parser_reindex = subparsers.add_parser("reindex", help="Re-build index file")
    parser_reindex.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services")
    parser_reindex.set_defaults(func=reindex, fix=True)

    parser_check = subparsers.add_parser("check", help="Check transaction files")
    parser_check.add_argument("--fix", "-f", action="store_true", help="Try to fix issues")
    parser_check.add_argument("--base", "-b", metavar="DN", help="LDAP base [%(default)s]", default=ucr['ldap/base'])
    parser_check.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role'])
    parser_check.add_argument("--listener-file", "-L", metavar="FILENAME", help="Listener file [%(default)s]", default="/var/lib/univention-ldap/listener/listener")
    parser_check.add_argument("--listener-private-file", "-P", metavar="FILENAME", help="Listener private file [%(default)s]", default="/var/lib/univention-ldap/listener/listener.priv")
    parser_check.add_argument("--last-file", "-I", metavar="FILENAME", help="Last_id file [%(default)s]", default="/var/lib/univention-ldap/last_id")
    parser_check.add_argument("--skip-services", "-S", action="store_true", help="Skip stopping and starting services")
    parser_check.set_defaults(func=check)

    parser_prune = subparsers.add_parser("prune", help="Prune transaction files")
    parser_prune.add_argument("trans", metavar="TID", type=int, help="Oldest transaction number to keep (negative numbers: the number of transactions to keep)")
    parser_prune.add_argument("--role", "-r", metavar="ROLE", help="Server role [%(default)s]", default=ucr['server/role'])
    parser_prune.set_defaults(func=prune, fix=True)

    opt = parser.parse_args(args)

    return opt


def parse_int_pos(string):
    # type: (str) -> long
    """
    Parse positive integer string.

    :param string: The command line string.
    :returns: The parsed integer.
    :raises ArgumentTypeError: if the string is valid.
    """
    try:
        val = long(string)
        if 0 < val:
            return val
    except ValueError:
        pass
    raise ArgumentTypeError("Positive integer required")


def parse_percent(string):
    # type: (str) -> float
    """
    Parse percentage string.

    :param string: The command line string.
    :returns: The parsed percentage.
    :raises ArgumentTypeError: if the string is valid.
    """
    try:
        val = float(string.rstrip('%'))
        if not 0.0 <= val <= 100.0:
            raise ValueError()
    except ValueError:
        raise ArgumentTypeError("Invalid percentage")
    return val / 100.0


def parse_size(string):
    # type: (str) -> long
    """
    Parse size string.

    :param string: The command line string.
    :returns: The parsed size.
    :raises ArgumentTypeError: if the string is valid.
    """
    suffix = string.lstrip(".0123456789")
    try:
        unit, = suffix.rstrip("iIbB").upper() or ' '
        scale = 1L << (10 * " KMGTPE".index(unit))
    except ValueError:
        raise ArgumentTypeError("Invalid unit")

    prefix = string[:-len(suffix)] if suffix else string
    try:
        value = float(prefix)
        if not 0.0 <= value:
            raise ValueError()
    except ValueError:
        raise ArgumentTypeError("Invalid value")

    return long(value * scale)


if __name__ == "__main__":
    exit(main())

