commit 14448a9e8506ab2e52ed44a600e196dfc7a6529f Author: Florian Best Date: Mon May 13 13:58:55 2019 +0200 Bug #46323: fix escaping of ldap DN and filters, and shell arguments diff --git a/base/univention-lib/python/admember.py b/base/univention-lib/python/admember.py index 0e34b61ac6..806aa46a5f 100644 --- a/base/univention-lib/python/admember.py +++ b/base/univention-lib/python/admember.py @@ -31,9 +31,7 @@ # . from __future__ import print_function -import ldb -import ldap -import ldap.sasl + import os import subprocess import locale @@ -42,10 +40,17 @@ import tempfile import ipaddr import time from datetime import datetime, timedelta +import pipes + +import ldb +import ldap +import ldap.sasl +from ldap.filter import filter_format from samba.dcerpc import nbt, security from samba.ndr import ndr_unpack from samba.net import Net from samba.param import LoadParm + import univention.config_registry import univention.uldap import univention.lib.package_manager @@ -71,6 +76,7 @@ finally: if orig_path: sys.path = orig_path + # Ensure univention debug is initialized def initialize_debug(): # Use a little hack to determine if univention.debug has been initialized @@ -88,6 +94,7 @@ def initialize_debug(): else: ud.set_level(ud.MODULE, oldLevel) + class failedToSetService(Exception): '''ucs_addServiceToLocalhost failed''' @@ -337,7 +344,7 @@ def check_ad_account(ad_domain_info, username, password, ucr=None): domain_sid = ndr_unpack(security.dom_sid, res[0][1]["objectSid"][0]) - res = lo_ad.search(filter="(sAMAccountName=%s)" % username, attr=["objectSid", "primaryGroupID"]) + res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), attr=["objectSid", "primaryGroupID"]) if not res or "objectSid" not in res[0][1]: msg = "Determination user SID failed" ud.debug(ud.MODULE, ud.ERROR, msg) @@ -354,7 +361,7 @@ def check_ad_account(ad_domain_info, username, password, ucr=None): user_dn = res[0][0] - res = lo_ad.search(filter="(sAMAccountName=%s)" % username, base=user_dn, scope="base", attr=["tokenGroups"]) + res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), base=user_dn, scope="base", attr=["tokenGroups"]) if not res or "tokenGroups" not in res[0][1]: msg = "Lookup of AD group memberships for user failed" ud.debug(ud.MODULE, ud.ERROR, msg) @@ -380,7 +387,7 @@ def _sid_of_ucs_sambadomain(lo=None, ucr=None): ucr = univention.config_registry.ConfigRegistry() ucr.load() - res = lo.search(filter="(&(objectclass=sambadomain)(sambaDomainName=%s))" % ucr.get("windows/domain"), attr=["sambaSID"], unique=True) + res = lo.search(filter=filter_format("(&(objectclass=sambadomain)(sambaDomainName=%s))", [ucr.get("windows/domain")]), attr=["sambaSID"], unique=True) if not res: ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for sambaDomainName=%s" % ucr.get("windows/domain")) raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) @@ -403,7 +410,7 @@ def _dn_of_udm_domain_admins(lo=None, ucr=None): ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr) domain_admins_sid = "%s-%s" % (ucs_domain_sid, security.DOMAIN_RID_ADMINS) - res = lo.searchDn(filter="(sambaSID=%s)" % domain_admins_sid, unique=True) + res = lo.searchDn(filter=filter_format("(sambaSID=%s)", [domain_admins_sid]), unique=True) if not res: ud.debug(ud.MODULE, ud.ERROR, "Failed to determine DN of UCS Domain Admins group") raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) @@ -490,7 +497,7 @@ def prepare_administrator(username, password, ucr=None): # First check if account exists in LDAP, otherwise create it: lo = univention.uldap.getMachineConnection() - res = lo.search(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), attr=["userPassword", "sambaSID"]) + res = lo.search(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), attr=["userPassword", "sambaSID"]) if not res: ud.debug(ud.MODULE, ud.INFO, "No UCS LDAP search result for uid=%s" % username) try: @@ -536,26 +543,26 @@ def prepare_administrator(username, password, ucr=None): def _mapped_ad_dn(ad_dn, ad_ldap_base, ucr=None): - if ad_dn[-len(ad_ldap_base):] != ad_ldap_base: - ud.debug(ud.MODULE, ud.ERROR, "Mapping of AD DN %s failed, base is not %s" % (ad_dn, ad_ldap_base)) + """ + >>> _mapped_ad_dn('uid=Administrator + CN=admin,OU=users,CN=univention,Foo=univention,bar=base', 'foo=univention,bar = base', {'ldap/base': 'dc=base'}) + 'uid=Administrator+cn=admin,ou=users,cn=univention,dc=base' + """ + parent = ad_dn + while parent: + if univention.uldap.access.compare_dn(parent, ad_ldap_base): + break + parent = univention.uldap.parentDn(parent) + else: + ud.debug(ud.MODULE, ud.ERROR, "Mapping of AD DN %r failed, base is not %r" % (ad_dn, ad_ldap_base)) return if not ucr: ucr = univention.config_registry.ConfigRegistry() ucr.load() - relative_dn = ad_dn[:-len(ad_ldap_base) - 1] - mapped_relative_dn_components = [] - relative_dn_components = relative_dn.split(',') - for rdn in relative_dn_components: - attr, val = rdn.split('=') - if attr in ('CN', 'OU'): - attr = attr.lower() - mapped_rdn = '='.join((attr, val)) - mapped_relative_dn_components.append(mapped_rdn) - mapped_relative_dn = ','.join(mapped_relative_dn_components) - mapped_dn = ",".join((mapped_relative_dn, ucr.get("ldap/base"))) - return mapped_dn + base = ldap.dn.str2dn(ad_ldap_base) + dn = [[(attr[0].lower() if attr[0] in ('CN', 'OU') else attr[0], attr[1], attr[2]) for attr in x] for x in ldap.dn.str2dn(ad_dn)[:-len(base)]] + return ldap.dn.dn2str(dn + ldap.dn.str2dn(ucr.get("ldap/base"))) def synchronize_account_position(ad_domain_info, username, password, ucr=None): @@ -590,7 +597,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM): return False # Massive failure, but no issue to be raised here. - res = lo_ad.searchDn(filter="(sAMAccountName=%s)" % username) + res = lo_ad.searchDn(filter=filter_format("(sAMAccountName=%s)", [username])) if not res: ud.debug(ud.MODULE, ud.ERROR, "Lookup of AD DN for user %s failed" % username) return False # Massive failure, but no issue to be raised here. @@ -598,7 +605,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): # Second determine position in UCS LDAP: lo = univention.uldap.getMachineConnection() - res = lo.searchDn(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), unique=True) + res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True) if not res: ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username) return False # Massive failure, but no issue to be raised here. @@ -608,7 +615,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): return True mapped_ad_user_dn = _mapped_ad_dn(ad_user_dn, ad_ldap_base, ucr) - target_position = mapped_ad_user_dn.split(',', 1)[1] + target_position = lo.parentDn(mapped_ad_user_dn) cmd = ("univention-directory-manager", "users/user", "move", "--dn", ucs_user_dn, "--position", target_position) p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) @@ -664,14 +671,14 @@ def disable_ssl(): def _add_service_to_localhost(service): ud.debug(ud.MODULE, ud.PROCESS, "Adding service %s to localhost" % service) - res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_addServiceToLocalhost "%s"' % service, shell=True) + res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_addServiceToLocalhost %s' % (pipes.quote(service),), shell=True) if res != 0: raise failedToSetService def _remove_service_from_localhost(service): ud.debug(ud.MODULE, ud.PROCESS, "Remove service %s from localhost" % service) - res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_removeServiceFromLocalhost "%s"' % service, shell=True) + res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_removeServiceFromLocalhost %s' % (pipes.quote(service),), shell=True) if res != 0: raise failedToSetService @@ -977,7 +984,7 @@ def rename_well_known_sid_objects(username, password, ucr=None): ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr) domain_admins_sid = "%s-%s" % (ucs_domain_sid, security.DOMAIN_RID_ADMINS) - res = lo.search(filter="(&(sambaSID=%s)(objectClass=sambaGroupMapping))" % domain_admins_sid, attr=["cn"], unique=True) + res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True) if not res or "cn" not in res[0][1]: ud.debug(ud.MODULE, ud.ERROR, "Lookup of group name for Domain Admins sid failed") domain_admins_name = "Domain Admins" # sensible guess @@ -1001,7 +1008,7 @@ def rename_well_known_sid_objects(username, password, ucr=None): raise connectionFailed(msg) # Finally wait for replication and slapd restart to ensure that new LDAP ACLs are active: - res = lo.search(filter="(&(sambaSID=%s)(objectClass=sambaGroupMapping))" % domain_admins_sid, attr=["cn"], unique=True) + res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True) if not res or "cn" not in res[0][1]: ud.debug(ud.MODULE, ud.ERROR, "Lookup of new group name for Domain Admins sid failed") new_domain_admins_name = "Domain Admins" @@ -1075,7 +1082,7 @@ def prepare_dns_reverse_settings(ad_domain_info, ucr=None): except (socket.herror, socket.gaierror) as exc: ud.debug(ud.MODULE, ud.INFO, "Resolving %s failed: %s" % (ad_domain_info['DC IP'], exc.args[1])) - ## Set a hosts/static anyway, to be safe from DNS issues (Bug #38285) + # Set a hosts/static anyway, to be safe from DNS issues (Bug #38285) previous_ucr_set = [] previous_ucr_unset = [] @@ -1097,6 +1104,7 @@ def prepare_dns_reverse_settings(ad_domain_info, ucr=None): return (previous_ucr_set, previous_ucr_unset) + def prepare_kerberos_ucr_settings(realm=None, ucr=None): ud.debug(ud.MODULE, ud.PROCESS, "Prepare Kerberos UCR settings") @@ -1252,7 +1260,7 @@ def run_samba_join_script(username, password, ucr=None): ud.debug(ud.MODULE, ud.PROCESS, "Running samba join script") lo = univention.uldap.getMachineConnection() - res = lo.searchDn(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), unique=True) + res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True) if not res: ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username) raise sambaJoinScriptFailed() @@ -1283,9 +1291,9 @@ def add_host_record_in_ad(uid=None, binddn=None, bindpw=None, bindpwdfile=None, domainname = ucr.get('domainname') if binddn: - for i in binddn.split(','): - if i.lower().startswith('uid='): - uid = i.split('=', 1)[1] + uids = [y[1] for x in ldap.dn.str2dn(binddn) for y in x if ('uid' in y)] + if uids: + uid = uids[0] if bindpwdfile: create_pwdfile = False pwdfile = bindpwdfile @@ -1328,7 +1336,7 @@ def add_host_record_in_ad(uid=None, binddn=None, bindpw=None, bindpwdfile=None, print('%s A record for %s found' % (fqdn, ip)) return True - # create host record + # create host record # FIXME; missing quoting fd = tempfile.NamedTemporaryFile(delete=False) fd.write('server %s\n' % ad_ip) fd.write('update add %s 86400 A %s\n' % (fqdn, ip)) @@ -1410,6 +1418,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): with tempfile.NamedTemporaryFile() as fd, tempfile.NamedTemporaryFile() as fd2: fd2.write(password) fd2.flush() + # FIXME: missing quoting fd.write('server %s\n' % ad_ip) fd.write('update delete %s. SRV\n' % (srv_record,)) fd.write('send\n') @@ -1424,6 +1433,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): ud.debug(ud.MODULE, ud.ERROR, "failed to remove SRV record. Ignoring error.") subprocess.call(['kdestroy']) + # FIXME: missing quoting fd = tempfile.NamedTemporaryFile(delete=False) fd.write('server %s\n' % ad_ip) fd.write('update add %s. 10800 SRV 0 0 0 %s\n' % @@ -1454,7 +1464,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): def get_ucr_variable_from_ucs(host, server, var): cmd = ['univention-ssh', '/etc/machine.secret'] cmd += ['%s\$@%s' % (host, server)] - cmd += ['/usr/sbin/ucr get %s' % var] + cmd += ['/usr/sbin/ucr get %s' % (pipes.quote(var),)] p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p1.communicate() if p1.returncode: