Univention Bugzilla – Attachment 8160 Details for
Bug 42775
CLI tool to check an account
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
ucs-account-check
ucs-account-check (text/x-python), 8.49 KB, created by
Arvid Requate
on 2016-10-26 16:25 CEST
(
hide
)
Description:
ucs-account-check
Filename:
MIME Type:
Creator:
Arvid Requate
Created:
2016-10-26 16:25 CEST
Size:
8.49 KB
patch
obsolete
>#!/usr/bin/env python >import sys >import ldap >import univention.uldap >import univention.admin.uldap >import univention.admin.modules >from ldap.controls import SimplePagedResultsControl, RequestControl >LDB_CONTROL_DOMAIN_SCOPE_OID = "1.2.840.113556.1.4.1339" >LDAP_SERVER_SHOW_DELETED_OID = "1.2.840.113556.1.4.417" >import subprocess >import univention.config_registry > >class Main(object): > account_modules = ("users/user", "groups/group", > "computers/windows", "computers/windows_domaincontroller", > "computers/domaincontroller_master", "computers/domaincontroller_backup", "computers/domaincontroller_slave", > "computers/memberserver", "computers/ipmanagedclient", > "computers/ubuntu", "computers/linux", "computers/macos", > "computers/trustaccount" > ) > > def __call__(self, accountnames): > self.ucr = univention.config_registry.ConfigRegistry() > self.ucr.load() > > self.ucs_ldap = univention.uldap.getMachineConnection() > self.pos = univention.admin.uldap.position(self.ucs_ldap.base) > univention.admin.modules.update() > self.udm_modules = [] > for module_name in self.account_modules: > _udm_module = univention.admin.modules.get(module_name) > univention.admin.modules.init(self.ucs_ldap, self.pos, _udm_module) > self.udm_modules.append((module_name, _udm_module)) > > for accountname in accountnames: > results = self.search_in_openldap(accountname) > for (account_dn, account_obj) in results: > (account_type, account_options) = self.identify_account(account_dn, account_obj) > if "posix" in account_options: > self.check_posix_account(accountname, account_type, account_options, account_dn, account_obj) > if "samba" in account_options: > self.check_samba_account(accountname, account_type, account_options, account_dn, account_obj) > if "kerberos" in account_options: > self.check_kerberos_principal(accountname, account_type, account_options, account_dn, account_obj) > > def udm_module_for_account(self, account_dn, account_obj): > for (module_name, _udm_module) in self.udm_modules: > if _udm_module.identify(account_dn, account_obj): > return (module_name, _udm_module) > > def identify_account(self, account_dn, account_obj): > (module_name, _udm_module) = self.udm_module_for_account(account_dn, account_obj) > udm_obj = univention.admin.objects.get(_udm_module, co="", lo=self.ucs_ldap, position="", dn=account_dn) > udm_obj.open() > return (module_name, udm_obj.options) > > def search_in_openldap(self, accountname, ): > user_filter = "(&(uid=%s)(|(uidNumber=*)(sambaSID=*)(krb5PrincipalName=*)))" % accountname > group_filter = "(&(cn=%s)(|(gidNumber=*)(sambaSID=*)))" % accountname > filter = "(|%s%s)" % (user_filter, group_filter) > # results = self.ucs_ldap.search( filter = filter, base = "", scope = "sub", attr = [], unique = 0, required = 0, timeout = -1, sizelimit = 0) > results = self.search_ldap(filter = filter) > return results > > def search_ldap(self, base = "", scope = ldap.SCOPE_SUBTREE, filter = "(objectClass=*)", attrlist = [], show_deleted = True): > PAGE_SIZE = 1000 > ctrls=[ > SimplePagedResultsControl(True, PAGE_SIZE, ""), ## Must be the first > ] > > if not base: > base=self.ucs_ldap.base > > if show_deleted: > ctrls.append(RequestControl(LDAP_SERVER_SHOW_DELETED_OID)) > > msgid = self.ucs_ldap.lo.search_ext(base=base, scope=scope, filterstr=filter, attrlist=attrlist, serverctrls=ctrls, timeout=-1, sizelimit=0) > > res = [] > pages = 0 > while True: > pages += 1 > rtype, rdata, rmsgid, serverctrls = self.ucs_ldap.lo.result3(msgid) > res += rdata > > pctrls = [ > c > for c in serverctrls > if c.controlType == SimplePagedResultsControl.controlType > ] > if pctrls: > cookie = pctrls[0].cookie > if cookie: > if pages > 1: > print("LDAP search continues, already found %s objects" % len(res)) > ctrls[0].cookie = cookie > msgid = self.ucs_ldap.lo.search_ext(base, scope, filter, attrlist, serverctrls=ctrls, timeout=-1, sizelimit=0) > else: > break > else: > print("LDAP ignores PAGE_RESULTS") > break > > return res > > def check_posix_account(self, accountname, account_type, account_options, account_dn, account_obj): > if account_type == "groups/group": > self.check_posix_group(accountname, account_type, account_options, account_dn, account_obj) > else: > self.check_posix_user(accountname, account_type, account_options, account_dn, account_obj) > > def check_posix_user(self, accountname, account_type, account_options, account_dn, account_obj): > print "## Posix User" > cmd = ["id", accountname] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > print out.strip() > print > return True > > def check_posix_group(self, accountname, account_type, account_options, account_dn, account_obj): > print "## Posix Group" > cmd = ["getent", "group", accountname] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > print out.strip() > print > return True > > def check_samba_account(self, accountname, account_type, account_options, account_dn, account_obj): > print "### Samba account" > sid = None > wbinfo_accountname = None > > cmd = ["wbinfo", "-n", accountname] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > if not out: > print "ERROR during", " ".join(cmd) > print "no output" > else: > sid = out.split()[0] > if not sid: > print "ERROR: wbinfo reported no SID" > return False > else: > if account_obj.get("sambaSID", [None])[0] != sid: > print "ERROR: wbinfo reported SID '%s'" % sid > return False > > cmd = ["wbinfo", "-s", sid] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > if not out: > print "ERROR during", " ".join(cmd) > print "no output" > else: > wbinfo_accountname = out.rsplit(" ", 1)[0] > > if account_type == "groups/group": > wbinfo_sid_to_xid_option = "-Y" > wbinfo_xid_to_sid_option = "-G" > else: > wbinfo_sid_to_xid_option = "-S" > wbinfo_xid_to_sid_option = "-U" > > cmd = ["wbinfo", wbinfo_sid_to_xid_option, sid] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > if not out: > print "ERROR during", " ".join(cmd) > print "no output" > else: > xid_number = out.strip() > > cmd = ["wbinfo", wbinfo_xid_to_sid_option, xid_number] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > if not out: > print "ERROR during", " ".join(cmd) > print "no output" > else: > if out.strip() != sid: > print "ERROR during", " ".join(cmd) > print "SID returned: '%s'" % (out.strip(),) > print "OK" > print > return True > > > def check_kerberos_principal(self, accountname, account_type, account_options, account_dn, account_obj): > print "### Kerberos principal" > if self.ucr["ldap/hostdn"] == account_dn: > cmd = ["kinit", "--password-file=/etc/machine.secret", accountname] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > else: > print "Success obtaining Kerberos ticket with machine secret" > > ldap_servers = self.ucr.get("ldap/server/name", "").split() > if len(ldap_servers) > 0: > cmd = ["ldapsearch", "-Y", "GSSAPI", "-LL", "-h", ldap_servers[0], "-p", "389", "-s", "base", "dn"] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print "Kerberized LDAP search (port 389) failed" > else: > print "Kerberized LDAP search (port 389) successfull" > > cmd = ["kinit", "-t", "/etc/krb5.keytab", "host/%s.%s" % (self.ucr["hostname"], self.ucr["domainname"])] > p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) > out, err = p.communicate() > if p.returncode != 0: > print "ERROR during", " ".join(cmd) > print out > else: > print "Success obtaining Kerberos ticket with keytab" > > return True > >if __name__ == "__main__": > main = Main() > main(sys.argv[1:])
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
Actions:
View
Attachments on
bug 42775
: 8160