The set/password endpoint of the UMC server uses PAM to perform the password change. In UCS, this calls pam_krb5, which contacts the Kerberos server. There, the old password is checked and UDM is called to actually perform the password change. In UMS, there is no Kerberos server, hence the set/password handler is modified here to use PAM only for verifying the old password, then UDM is called directly to modify the password. Issue: https://git.knut.univention.de/univention/customers/dataport/team-souvap/-/issues/311 diff --git /usr/lib/python3/dist-packages/univention/management/console/session.py /usr/lib/python3/dist-packages/univention/management/console/session.py index 6973b4c001..32670d5f87 100644 --- /usr/lib/python3/dist-packages/univention/management/console/session.py +++ /usr/lib/python3/dist-packages/univention/management/console/session.py @@ -41,16 +41,18 @@ import tornado.gen from ldap.filter import filter_format import univention.admin.uexceptions as udm_errors +import univention.admin.handlers.users.user as udm_user from .acl import ACLs, LDAP_ACLs from .auth import AuthHandler from .category import Manager as CategoryManager from .config import MODULE_DEBUG_LEVEL, ucr -from .error import ServiceUnavailable +from .error import ServiceUnavailable, UMC_Error from .ldap import get_machine_connection, reset_cache as reset_ldap_connection_cache from .log import CORE from .message import Request from .module import Manager as ModuleManager +from .pam import AuthenticationFailed, PasswordChangeFailed, PasswordExpired try: @@ -153,14 +155,54 @@ class Session(object): return result async def change_password(self, args): - from .server import pool - pam = self.__auth.get_handler(args['locale']) username = args['username'] - password = args['password'] - new_password = args['new_password'] - future = pool.submit(pam.change_password, username, password, new_password) - await asyncio.wrap_future(future) + locale = args['locale'] + language = locale.split('_', 1)[0] + new_password = args.pop('new_password') + + from .server import pool + pam = self.__auth.get_handler(locale) + try: + future = pool.submit(self.__auth.authenticate, pam, args) + result = await asyncio.wrap_future(future) + authenticated = bool(result) + CORE.info("Authentication for %s: %s" % (username, str(result))) + except PasswordExpired as exc: + CORE.warn("Password for user %s is expired: %s" % (username, str(exc))) + authenticated = True + except AuthenticationFailed as exc: + CORE.error("Authentication failed: %s" % (str(exc),)) + authenticated = False pam.end() + + if not authenticated: + message = pam._('The entered password does not match the current one.') + raise PasswordChangeFailed(message) + + CORE.info("Setting new password for user: %s" % (username,)) + lo = get_machine_connection(write=True)[0] + if lo: + user_dn = lo.searchDn(filter_format('(&(uid=%s)(objectClass=person))', (username,)))[0] + CORE.info("User dn: %s" % (user_dn,)) + user = udm_user.object(None, lo, None, user_dn) + user.open() + user["password"] = new_password + user["pwdChangeNextLogin"] = 0 + try: + user.modify() + except (udm_errors.pwToShort, udm_errors.pwQuality) as exc: + password_complexity_message = ucr.get('umc/login/password-complexity-message/%s' % (language,), ucr.get('umc/login/password-complexity-message/en', exc)) + raise UMC_Error(password_complexity_message) + except udm_errors.pwalreadyused as exc: + raise UMC_Error(exc.message) + except Exception as exc: + CORE.error(f"udm_set_password(): failed to set password: {traceback.format_exc()}") + raise PasswordChangeFailed(str(exc)) + else: + CORE.info("User modify succeeded!") + else: + raise PasswordChangeFailed("LDAP connection failed") + self.set_credentials(username, new_password, None) def set_credentials(self, username, password, auth_type):