|
36 |
|
36 |
|
37 |
from ldap.dn import escape_dn_chars |
37 |
from ldap.dn import escape_dn_chars |
38 |
from univention.admin.uexceptions import noObject |
38 |
from univention.admin.uexceptions import noObject |
39 |
from ucsschool.importer.utils.ldap_connection import get_admin_connection |
39 |
from ucsschool.importer.utils.ldap_connection import get_admin_connection, get_machine_connection |
40 |
from ucsschool.importer.exceptions import FormatError |
40 |
from ucsschool.importer.exceptions import FormatError |
41 |
from ucsschool.importer.utils.logging import get_logger |
41 |
from ucsschool.importer.utils.logging import get_logger |
42 |
|
42 |
|
|
108 |
""" |
108 |
""" |
109 |
|
109 |
|
110 |
allowed_chars = string.ascii_letters + string.digits + "." |
110 |
allowed_chars = string.ascii_letters + string.digits + "." |
|
|
111 |
_mem_store = dict() |
111 |
|
112 |
|
112 |
def __init__(self, username_max_length): |
113 |
def __init__(self, username_max_length, dry_run=True): |
|
|
114 |
""" |
115 |
:param username_max_length: int: created usernames will be no longer |
116 |
than this |
117 |
:param dry_run: bool: if False use LDAP to store already-used usernames |
118 |
if True store for one run only in memory |
119 |
""" |
113 |
self.username_max_length = username_max_length |
120 |
self.username_max_length = username_max_length |
|
|
121 |
self.dry_run = dry_run |
114 |
self.logger = get_logger() |
122 |
self.logger = get_logger() |
115 |
self.connection, self.position = get_admin_connection() |
123 |
if self.dry_run: |
|
|
124 |
self.connection, self.position = get_machine_connection() |
125 |
else: |
126 |
self.connection, self.position = get_admin_connection() |
116 |
self.replacement_variable_pattern = re.compile(r'(%s)' % '|'.join(map(re.escape, self.counter_variable_to_function.keys())), flags=re.I) |
127 |
self.replacement_variable_pattern = re.compile(r'(%s)' % '|'.join(map(re.escape, self.counter_variable_to_function.keys())), flags=re.I) |
117 |
|
128 |
|
118 |
def add_to_ldap(self, username, first_number): |
129 |
def add_to_ldap(self, username, first_number): |
119 |
assert isinstance(username, basestring) |
130 |
assert isinstance(username, basestring) |
120 |
assert isinstance(first_number, basestring) |
131 |
assert isinstance(first_number, basestring) |
121 |
self.connection.add( |
132 |
if self.dry_run: |
122 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
133 |
self._mem_store[username] = first_number |
123 |
escape_dn_chars(username), self.connection.base), |
134 |
else: |
124 |
[ |
135 |
self.connection.add( |
125 |
("objectClass", "ucsschoolUsername"), |
136 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
126 |
("ucsschoolUsernameNextNumber", first_number) |
137 |
escape_dn_chars(username), self.connection.base), |
127 |
] |
138 |
[ |
128 |
) |
139 |
("objectClass", "ucsschoolUsername"), |
|
|
140 |
("ucsschoolUsernameNextNumber", first_number) |
141 |
] |
142 |
) |
129 |
|
143 |
|
130 |
def get_next_number(self, username): |
144 |
def _get_next_number_from_ldap(self, username): |
131 |
assert isinstance(username, basestring) |
|
|
132 |
try: |
145 |
try: |
133 |
return self.connection.get( |
146 |
return self.connection.get( |
134 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
147 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
|
137 |
except KeyError: |
150 |
except KeyError: |
138 |
raise noObject("Username '{}' not found.".format(username)) |
151 |
raise noObject("Username '{}' not found.".format(username)) |
139 |
|
152 |
|
|
|
153 |
def get_next_number(self, username): |
154 |
assert isinstance(username, basestring) |
155 |
if self.dry_run: |
156 |
try: |
157 |
res = self._mem_store[username] |
158 |
except KeyError: |
159 |
res = self._get_next_number_from_ldap(username) |
160 |
self._mem_store[username] = res |
161 |
else: |
162 |
res = self._get_next_number_from_ldap(username) |
163 |
return res |
164 |
|
140 |
def get_and_raise_number(self, username): |
165 |
def get_and_raise_number(self, username): |
141 |
assert isinstance(username, basestring) |
166 |
assert isinstance(username, basestring) |
142 |
cur = self.get_next_number(username) |
167 |
cur = self.get_next_number(username) |
143 |
next = int(cur) + 1 |
168 |
next = int(cur) + 1 |
144 |
self.connection.modify( |
169 |
if self.dry_run: |
145 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
170 |
self._mem_store[username] = str(next) |
146 |
escape_dn_chars(username), self.connection.base), |
171 |
else: |
147 |
[("ucsschoolUsernameNextNumber", cur, str(next))] |
172 |
self.connection.modify( |
148 |
) |
173 |
"cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( |
|
|
174 |
escape_dn_chars(username), self.connection.base), |
175 |
[("ucsschoolUsernameNextNumber", cur, str(next))] |
176 |
) |
149 |
return cur |
177 |
return cur |
150 |
|
178 |
|
151 |
def remove_bad_chars(self, name): |
179 |
def remove_bad_chars(self, name): |