#!/usr/share/ucs-test/runner python ## desc: smbstatus parser ## roles: [domaincontroller_master, domaincontroller_slave] ## tags: [apptest, ucsschool] ## exposure: dangerous ## packages: [univention-samba4] from ucsschool.lib.smbstatus import SMB_Status import socket import subprocess import time import univention.testing.ucr as ucr_test import univention.testing.utils as utils def main(): with ucr_test.UCSTestConfigRegistry() as ucr: account = utils.UCSTestDomainAdminCredentials() admin = account.username passwd = account.bindpw host = ucr.get('hostname') # build up smb connections pop1 = subprocess.Popen(['smbclient', '-U', '%s%%%s' % (admin, passwd), '//%s/netlogon' % host], stdin = subprocess.PIPE) pop2 = subprocess.Popen(['smbclient', '-U', '%s%%%s' % (admin, passwd), '//%s/sysvol' % host], stdin = subprocess.PIPE) pop3 = subprocess.Popen(['smbclient', '-U', '%s%%%s' % (admin, passwd), '//%s/IPC$' % host], stdin = subprocess.PIPE) # wait for the connections to establish time.sleep(10) # subprocess.Popen(['python', '/usr/share/pyshared/ucsschool/lib/smbstatus.py']).communicate() status = SMB_Status() print 'smbstatus = ', status if not status: utils.fail('smbclient was not able to open any connection to host (%s)' % host) def get_proccess_by_services(services): for process in status: if set(process.services) == set(services): return process ipaddress = get_ipaddress() expected_process_values = [{ 'services': ['netlogon'], }, { 'services': ['sysvol'], }, { 'services': ['IPC$'], }] for expected_values in expected_process_values: expected_values.update({ 'username': admin, 'ipaddress': ipaddress }) process = get_proccess_by_services(expected_values['services']) if not process: utils.fail('The process with services %s was not recognized by smbstatus' % (expected_values['services'])) check_attributes(process, expected_values) pop1.terminate() pop2.terminate() pop3.terminate() def get_ipaddress(): return socket.gethostbyname(socket.gethostname()) def check_attributes(process, expected_values): _attrs = [ 'pid', 'username', 'group', 'machine', 'services', 'ipaddress', ] for attr in _attrs: try: value = getattr(process, attr) except AttributeError: value = process.get(attr) if not value: utils.fail('Could not fetch the attribute %s' % (attr,)) if attr in expected_values: if attr == 'ipaddress': value = value.rsplit(':', 1)[0] if value != expected_values[attr]: utils.fail('Attribute (%s) is parsed wrong as (%s), expected in (%r)' % (attr, value, expected_values[attr])) if __name__ == '__main__': main()