#!/usr/share/ucs-test/runner python2.6 ## desc: | ## Test filter mechanism for Listener cache ## bugs: [38823] ## tags: [WIP] ## packages: ## - univention-directory-listener (>> 8.1, << 9.0) ## exposure: dangerous __package__ = '' from tempfile import mkdtemp from shutil import rmtree from os import mkdir, chown from os.path import join from sys import exit from time import sleep from pwd import getpwnam from subprocess import Popen from univention.config_registry.frontend import handler_set from univention.testing.udm import UCSTestUDM from univention.testing.ucr import UCSTestConfigRegistry from univention.testing.utils import verify_ldap_object MODULE = 'container/ou' UNIQUE = 'ucs-test38823' TMPDIR = '/tmp' USERID = 'listener' class Environment(object): def __init__(self): self.tmpdir = None self.mdir = None self.cdir = None ent = getpwnam(USERID) self.uid = ent.pw_uid def __enter__(self): self.tmpdir = mkdtemp(prefix='ucs-test', dir=TMPDIR) print 'I: tmpdir=%r' % (self.tmpdir,) chown(self.tmpdir, self.uid, -1) self.mdir = self.mkdir('module') self.cdir = self.mkdir('cache') self.copy_handler() return self def mkdir(self, name): path = join(self.tmpdir, name) mkdir(path) chown(path, self.uid, -1) return path def copy_handler(self): with open(argv[0], 'r') as source: with open(join(self.mdir, 'filter.py'), 'w') as target: for line in source: if line.startswith('TMPDIR = '): line = 'TMPDIR = %r\n' % (self.tmpdir,) target.write(line) def __exit__(self, exc_type, exc_value, traceback): rmtree(self.tmpdir, ignore_errors=True) self.tmpdir = None class Listener(object): def __init__(self, ucr, env): self.ucr = ucr self.env = env self.proc = None self.cmd = [ '/usr/sbin/univention-directory-listener', '-b', self.ucr['ldap/base'], '-m', self.env.mdir, '-c', self.env.cdir, '-x', '-ZZ', '-D', self.ucr['ldap/hostdn'], '-y', '/etc/machine.secret', ] def __enter__(self): self.init_listener() self.run_listener() def init_listener(self): cmd = self.cmd + [ '-d', '2', '-i', ] proc = Popen(cmd, close_fds=True) print 'I: %d = %r' % (proc.pid, cmd,) ret = proc.wait() print 'I: ret=%r' % (ret,) if ret: exit(ret) def run_listener(self): cmd = self.cmd + [ '-d', '4', '-F', ] self.proc = Popen(cmd, close_fds=True) print 'I: %d = %r' % (self.proc.pid, cmd,) return self def __exit__(self, exc_type, exc_value, traceback): self.kill_listener() ret = self.proc.wait() print 'I: ret=%d' % (ret,) self.proc = None def kill_listener(self): if not self.proc: return self.proc.terminate() for i in xrange(6): ret = self.proc.poll() print 'I: ret=%s %d...' % (ret, i,) if ret is not None: break sleep((1 << i) / 10.0) else: self.proc.kill() def main(): error = False with Environment() as env: with UCSTestConfigRegistry() as ucr: handler_set( ['listener/cache/filter=(ou=%s)' % (UNIQUE,)], opts={'schedule': True}, quiet=False) with Listener(ucr, env): with UCSTestUDM() as udm: ou = udm.create_object(MODULE, name=UNIQUE) udm.modify_object(MODULE, dn=ou, description=UNIQUE) verify_ldap_object(ou) error |= unexpected_transactions(env, ucr) error |= is_not_in_cache(env, ucr) exit(1 if error else 0) def unexpected_transactions(env, ucr): found_add = False found_modify = False found_other = False dn = 'ou=%s,%s' % (UNIQUE, ucr['ldap/base']) with open(join(env.tmpdir, UNIQUE), 'r') as log: for line in log: line = line.strip() if line == repr((dn, True, False, 'a')): print 'I: Found add: %s' % (line,) found_add = True elif line == repr((dn, True, False, 'm')): print 'I: Found modify: %s' % (line,) found_modify = True else: found_other = True print 'E: Found other: %s' % (line,) return found_other or not found_add or not found_modify def is_not_in_cache(env, ucr): error = False dn = 'dn: ou=%s,%s' % (UNIQUE, ucr['ldap/base']) dump = join(env.tmpdir, 'dump') cmd = [ '/usr/sbin/univention-directory-listener-dump', '-c', env.cdir, '-O', dump, ] proc = Popen(cmd, close_fds=True) print 'I: %d = %r' % (proc.pid, cmd,) proc.wait() with open(dump, 'r') as cache: for line in cache: line = line.strip() if line == dn: error = True print 'E: Found DN: %s' % (line,) elif UNIQUE in line: print 'W: Found line: %s' % (line,) return error def handler(dn, new, old, cmd=''): with open(join(TMPDIR, UNIQUE), 'a') as log: print >> log, repr((dn, bool(new), bool(old), cmd)) if __name__ == '__main__': from sys import argv main() else: name = "filter" description = "Test filter mechanism for Listener cache" filter = "(objectClass=organizationalUnit)" attributes = [] modrdn = "1" # vim:set ft=python: