Univention Bugzilla – Attachment 10023 Details for
Bug 46323
Configuring Active Directory connection: ValueError: need more than 1 value to unpack
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
[patch]
patch (git:fbest/46323-fix-ad-ldap-escaping)
46323.patch (text/plain), 12.56 KB, created by
Florian Best
on 2019-05-13 13:59:56 CEST
(
hide
)
Description:
patch (git:fbest/46323-fix-ad-ldap-escaping)
Filename:
MIME Type:
Creator:
Florian Best
Created:
2019-05-13 13:59:56 CEST
Size:
12.56 KB
patch
obsolete
>commit 14448a9e8506ab2e52ed44a600e196dfc7a6529f >Author: Florian Best <best@univention.de> >Date: Mon May 13 13:58:55 2019 +0200 > > Bug #46323: fix escaping of ldap DN and filters, and shell arguments > >diff --git a/base/univention-lib/python/admember.py b/base/univention-lib/python/admember.py >index 0e34b61ac6..806aa46a5f 100644 >--- a/base/univention-lib/python/admember.py >+++ b/base/univention-lib/python/admember.py >@@ -31,9 +31,7 @@ > # <http://www.gnu.org/licenses/>. > > from __future__ import print_function >-import ldb >-import ldap >-import ldap.sasl >+ > import os > import subprocess > import locale >@@ -42,10 +40,17 @@ import tempfile > import ipaddr > import time > from datetime import datetime, timedelta >+import pipes >+ >+import ldb >+import ldap >+import ldap.sasl >+from ldap.filter import filter_format > from samba.dcerpc import nbt, security > from samba.ndr import ndr_unpack > from samba.net import Net > from samba.param import LoadParm >+ > import univention.config_registry > import univention.uldap > import univention.lib.package_manager >@@ -71,6 +76,7 @@ finally: > if orig_path: > sys.path = orig_path > >+ > # Ensure univention debug is initialized > def initialize_debug(): > # Use a little hack to determine if univention.debug has been initialized >@@ -88,6 +94,7 @@ def initialize_debug(): > else: > ud.set_level(ud.MODULE, oldLevel) > >+ > class failedToSetService(Exception): > > '''ucs_addServiceToLocalhost failed''' >@@ -337,7 +344,7 @@ def check_ad_account(ad_domain_info, username, password, ucr=None): > > domain_sid = ndr_unpack(security.dom_sid, res[0][1]["objectSid"][0]) > >- res = lo_ad.search(filter="(sAMAccountName=%s)" % username, attr=["objectSid", "primaryGroupID"]) >+ res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), attr=["objectSid", "primaryGroupID"]) > if not res or "objectSid" not in res[0][1]: > msg = "Determination user SID failed" > ud.debug(ud.MODULE, ud.ERROR, msg) >@@ -354,7 +361,7 @@ def check_ad_account(ad_domain_info, username, password, ucr=None): > > user_dn = res[0][0] > >- res = lo_ad.search(filter="(sAMAccountName=%s)" % username, base=user_dn, scope="base", attr=["tokenGroups"]) >+ res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), base=user_dn, scope="base", attr=["tokenGroups"]) > if not res or "tokenGroups" not in res[0][1]: > msg = "Lookup of AD group memberships for user failed" > ud.debug(ud.MODULE, ud.ERROR, msg) >@@ -380,7 +387,7 @@ def _sid_of_ucs_sambadomain(lo=None, ucr=None): > ucr = univention.config_registry.ConfigRegistry() > ucr.load() > >- res = lo.search(filter="(&(objectclass=sambadomain)(sambaDomainName=%s))" % ucr.get("windows/domain"), attr=["sambaSID"], unique=True) >+ res = lo.search(filter=filter_format("(&(objectclass=sambadomain)(sambaDomainName=%s))", [ucr.get("windows/domain")]), attr=["sambaSID"], unique=True) > if not res: > ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for sambaDomainName=%s" % ucr.get("windows/domain")) > raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) >@@ -403,7 +410,7 @@ def _dn_of_udm_domain_admins(lo=None, ucr=None): > > ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr) > domain_admins_sid = "%s-%s" % (ucs_domain_sid, security.DOMAIN_RID_ADMINS) >- res = lo.searchDn(filter="(sambaSID=%s)" % domain_admins_sid, unique=True) >+ res = lo.searchDn(filter=filter_format("(sambaSID=%s)", [domain_admins_sid]), unique=True) > if not res: > ud.debug(ud.MODULE, ud.ERROR, "Failed to determine DN of UCS Domain Admins group") > raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) >@@ -490,7 +497,7 @@ def prepare_administrator(username, password, ucr=None): > > # First check if account exists in LDAP, otherwise create it: > lo = univention.uldap.getMachineConnection() >- res = lo.search(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), attr=["userPassword", "sambaSID"]) >+ res = lo.search(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), attr=["userPassword", "sambaSID"]) > if not res: > ud.debug(ud.MODULE, ud.INFO, "No UCS LDAP search result for uid=%s" % username) > try: >@@ -536,26 +543,26 @@ def prepare_administrator(username, password, ucr=None): > > > def _mapped_ad_dn(ad_dn, ad_ldap_base, ucr=None): >- if ad_dn[-len(ad_ldap_base):] != ad_ldap_base: >- ud.debug(ud.MODULE, ud.ERROR, "Mapping of AD DN %s failed, base is not %s" % (ad_dn, ad_ldap_base)) >+ """ >+ >>> _mapped_ad_dn('uid=Administrator + CN=admin,OU=users,CN=univention,Foo=univention,bar=base', 'foo=univention,bar = base', {'ldap/base': 'dc=base'}) >+ 'uid=Administrator+cn=admin,ou=users,cn=univention,dc=base' >+ """ >+ parent = ad_dn >+ while parent: >+ if univention.uldap.access.compare_dn(parent, ad_ldap_base): >+ break >+ parent = univention.uldap.parentDn(parent) >+ else: >+ ud.debug(ud.MODULE, ud.ERROR, "Mapping of AD DN %r failed, base is not %r" % (ad_dn, ad_ldap_base)) > return > > if not ucr: > ucr = univention.config_registry.ConfigRegistry() > ucr.load() > >- relative_dn = ad_dn[:-len(ad_ldap_base) - 1] >- mapped_relative_dn_components = [] >- relative_dn_components = relative_dn.split(',') >- for rdn in relative_dn_components: >- attr, val = rdn.split('=') >- if attr in ('CN', 'OU'): >- attr = attr.lower() >- mapped_rdn = '='.join((attr, val)) >- mapped_relative_dn_components.append(mapped_rdn) >- mapped_relative_dn = ','.join(mapped_relative_dn_components) >- mapped_dn = ",".join((mapped_relative_dn, ucr.get("ldap/base"))) >- return mapped_dn >+ base = ldap.dn.str2dn(ad_ldap_base) >+ dn = [[(attr[0].lower() if attr[0] in ('CN', 'OU') else attr[0], attr[1], attr[2]) for attr in x] for x in ldap.dn.str2dn(ad_dn)[:-len(base)]] >+ return ldap.dn.dn2str(dn + ldap.dn.str2dn(ucr.get("ldap/base"))) > > > def synchronize_account_position(ad_domain_info, username, password, ucr=None): >@@ -590,7 +597,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): > except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM): > return False # Massive failure, but no issue to be raised here. > >- res = lo_ad.searchDn(filter="(sAMAccountName=%s)" % username) >+ res = lo_ad.searchDn(filter=filter_format("(sAMAccountName=%s)", [username])) > if not res: > ud.debug(ud.MODULE, ud.ERROR, "Lookup of AD DN for user %s failed" % username) > return False # Massive failure, but no issue to be raised here. >@@ -598,7 +605,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): > > # Second determine position in UCS LDAP: > lo = univention.uldap.getMachineConnection() >- res = lo.searchDn(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), unique=True) >+ res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True) > if not res: > ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username) > return False # Massive failure, but no issue to be raised here. >@@ -608,7 +615,7 @@ def synchronize_account_position(ad_domain_info, username, password, ucr=None): > return True > > mapped_ad_user_dn = _mapped_ad_dn(ad_user_dn, ad_ldap_base, ucr) >- target_position = mapped_ad_user_dn.split(',', 1)[1] >+ target_position = lo.parentDn(mapped_ad_user_dn) > > cmd = ("univention-directory-manager", "users/user", "move", "--dn", ucs_user_dn, "--position", target_position) > p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) >@@ -664,14 +671,14 @@ def disable_ssl(): > > def _add_service_to_localhost(service): > ud.debug(ud.MODULE, ud.PROCESS, "Adding service %s to localhost" % service) >- res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_addServiceToLocalhost "%s"' % service, shell=True) >+ res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_addServiceToLocalhost %s' % (pipes.quote(service),), shell=True) > if res != 0: > raise failedToSetService > > > def _remove_service_from_localhost(service): > ud.debug(ud.MODULE, ud.PROCESS, "Remove service %s from localhost" % service) >- res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_removeServiceFromLocalhost "%s"' % service, shell=True) >+ res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_removeServiceFromLocalhost %s' % (pipes.quote(service),), shell=True) > if res != 0: > raise failedToSetService > >@@ -977,7 +984,7 @@ def rename_well_known_sid_objects(username, password, ucr=None): > ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr) > > domain_admins_sid = "%s-%s" % (ucs_domain_sid, security.DOMAIN_RID_ADMINS) >- res = lo.search(filter="(&(sambaSID=%s)(objectClass=sambaGroupMapping))" % domain_admins_sid, attr=["cn"], unique=True) >+ res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True) > if not res or "cn" not in res[0][1]: > ud.debug(ud.MODULE, ud.ERROR, "Lookup of group name for Domain Admins sid failed") > domain_admins_name = "Domain Admins" # sensible guess >@@ -1001,7 +1008,7 @@ def rename_well_known_sid_objects(username, password, ucr=None): > raise connectionFailed(msg) > > # Finally wait for replication and slapd restart to ensure that new LDAP ACLs are active: >- res = lo.search(filter="(&(sambaSID=%s)(objectClass=sambaGroupMapping))" % domain_admins_sid, attr=["cn"], unique=True) >+ res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True) > if not res or "cn" not in res[0][1]: > ud.debug(ud.MODULE, ud.ERROR, "Lookup of new group name for Domain Admins sid failed") > new_domain_admins_name = "Domain Admins" >@@ -1075,7 +1082,7 @@ def prepare_dns_reverse_settings(ad_domain_info, ucr=None): > except (socket.herror, socket.gaierror) as exc: > ud.debug(ud.MODULE, ud.INFO, "Resolving %s failed: %s" % (ad_domain_info['DC IP'], exc.args[1])) > >- ## Set a hosts/static anyway, to be safe from DNS issues (Bug #38285) >+ # Set a hosts/static anyway, to be safe from DNS issues (Bug #38285) > previous_ucr_set = [] > previous_ucr_unset = [] > >@@ -1097,6 +1104,7 @@ def prepare_dns_reverse_settings(ad_domain_info, ucr=None): > > return (previous_ucr_set, previous_ucr_unset) > >+ > def prepare_kerberos_ucr_settings(realm=None, ucr=None): > ud.debug(ud.MODULE, ud.PROCESS, "Prepare Kerberos UCR settings") > >@@ -1252,7 +1260,7 @@ def run_samba_join_script(username, password, ucr=None): > ud.debug(ud.MODULE, ud.PROCESS, "Running samba join script") > > lo = univention.uldap.getMachineConnection() >- res = lo.searchDn(filter="(&(uid=%s)(objectClass=shadowAccount))" % (username,), unique=True) >+ res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True) > if not res: > ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username) > raise sambaJoinScriptFailed() >@@ -1283,9 +1291,9 @@ def add_host_record_in_ad(uid=None, binddn=None, bindpw=None, bindpwdfile=None, > domainname = ucr.get('domainname') > > if binddn: >- for i in binddn.split(','): >- if i.lower().startswith('uid='): >- uid = i.split('=', 1)[1] >+ uids = [y[1] for x in ldap.dn.str2dn(binddn) for y in x if ('uid' in y)] >+ if uids: >+ uid = uids[0] > if bindpwdfile: > create_pwdfile = False > pwdfile = bindpwdfile >@@ -1328,7 +1336,7 @@ def add_host_record_in_ad(uid=None, binddn=None, bindpw=None, bindpwdfile=None, > print('%s A record for %s found' % (fqdn, ip)) > return True > >- # create host record >+ # create host record # FIXME; missing quoting > fd = tempfile.NamedTemporaryFile(delete=False) > fd.write('server %s\n' % ad_ip) > fd.write('update add %s 86400 A %s\n' % (fqdn, ip)) >@@ -1410,6 +1418,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): > with tempfile.NamedTemporaryFile() as fd, tempfile.NamedTemporaryFile() as fd2: > fd2.write(password) > fd2.flush() >+ # FIXME: missing quoting > fd.write('server %s\n' % ad_ip) > fd.write('update delete %s. SRV\n' % (srv_record,)) > fd.write('send\n') >@@ -1424,6 +1433,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): > ud.debug(ud.MODULE, ud.ERROR, "failed to remove SRV record. Ignoring error.") > subprocess.call(['kdestroy']) > >+ # FIXME: missing quoting > fd = tempfile.NamedTemporaryFile(delete=False) > fd.write('server %s\n' % ad_ip) > fd.write('update add %s. 10800 SRV 0 0 0 %s\n' % >@@ -1454,7 +1464,7 @@ def add_domaincontroller_srv_record_in_ad(ad_ip, username, password, ucr=None): > def get_ucr_variable_from_ucs(host, server, var): > cmd = ['univention-ssh', '/etc/machine.secret'] > cmd += ['%s\$@%s' % (host, server)] >- cmd += ['/usr/sbin/ucr get %s' % var] >+ cmd += ['/usr/sbin/ucr get %s' % (pipes.quote(var),)] > p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) > stdout, stderr = p1.communicate() > if p1.returncode:
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 46323
: 10023