Univention Bugzilla – Bug 44373
S4C: DN escaping difference in `object_memberships_sync()` (usernames with "#)
Last modified: 2020-07-08 00:14:02 CEST
Created attachment 8776 [details] Test-script that demonstrates the different DN escapings. The OpenLDAP and Samba-LDAP behave differently on accessing them via the Python-API: They differently escape DNs. This is an issue in at least UCS 4.1/4.2. Attached is a short test-script that demonstrates the issue. Here is the output: Testing with S4 User 's4_testuser_F2HC7GEMU8_#' created successfully s4 CN=s4_testuser_F2HC7GEMU8_\#,CN=Users,DC=loyen,DC=intranet s4 normalized CN=s4_testuser_F2HC7GEMU8_#,CN=Users,DC=loyen,DC=intranet Testing with UCS Object created: uid=ucs_testuser_O35ZX0XP3E_#\",cn=users,dc=loyen,dc=intranet ucs uid=ucs_testuser_O35ZX0XP3E_#\22,cn=users,dc=loyen,dc=intranet ucs normalized uid=ucs_testuser_O35ZX0XP3E_#\",cn=users,dc=loyen,dc=intranet The S4-LDAP escapes the #, UCS does not. A similar difference occurs with ", but `samba-tool` prevents creating a user with a " as part of the username. This surfaces in the S4-Connector during group-membership sync. During the group-membership sync, members are determined by comparing lowercase versions of DNs. Some of those DNs come directly from the Open/Samba-LDAP via the Python-API, some are constructed via the DN-mapping. `"CN=user\#,CN=Users,..".lower() != "CN=user#,CN=Users,..".lower()` and the S4-Connector may try to add the member. This will result in an LDAP-error, as the member already exists. Normalizing the DNs with the following function, solves the issue. This would be necessary for all DN comparisons within the group-membership sync. def normalize_dn(dn): return ldap.dn.dn2str(ldap.dn.str2dn(dn)) This would also effect the `group_members_cache_{ucs,con}`. It may be worthwhile to replace the raw dictionaries with something like the following, that takes care of normalizing the DNs and only performing lowercase comparison. class GroupMembersCache(object): def __init__(self, name='', initial=None): self._name = name self._cache = dict(initial or {}) def __repr__(self): return '{}({}, {})'.format(self.__class__.__name__, self._name, repr(self._cache)) def __str__(self): return repr(self) def clear(self, group_dn, log=False): group_dn_nn = normalize_dn(group_dn.lower()) self._cache.setdefault(group_dn_nn, set()).clear() if log: msg = "{}.clear: {}: cleared group cache of {}" ud.debug(ud.LDAP, ud.INFO, msg.format(self.__class__.__name__, self._name, group_dn_nn)) def set(self, group_dn, member_dn, log=False): group_dn_nn = normalize_dn(group_dn.lower()) member_dn_nn = normalize_dn(member_dn.lower()) self._cache.setdefault(group_dn_nn, set()).add(member_dn_nn) if log: msg = "{}.set: {}: append user {} to group cache of {}" ud.debug(ud.LDAP, ud.INFO, msg.format(self.__class__.__name__, self._name, member_dn_nn, group_dn_nn)) def update(self, group_dn, member_dn_iterator): group_dn_nn = normalize_dn(group_dn.lower()) group = self._cache.setdefault(group_dn_nn, set()) group.update(normalize_dn(m.lower()) for m in member_dn_iterator) def get(self, group_dn): group_dn_nn = normalize_dn(group_dn.lower()) return list(self._cache.get(group_dn_nn, set())) def cache_has(self, group_dn, member_dn): group_dn_nn = normalize_dn(group_dn.lower()) member_dn_nn = normalize_dn(member_dn.lower()) return member_dn_nn in self._cache.get(group_dn_nn, set())
Note: we have a comparision function: lo.compare_dn(a, b) → bool