|
Lines 14-27
from samba.tests.subunitrun import SubunitOptions, TestProgram
Link Here
|
| 14 |
import samba.getopt as options |
14 |
import samba.getopt as options |
| 15 |
|
15 |
|
| 16 |
from samba.auth import system_session |
16 |
from samba.auth import system_session |
| 17 |
from samba import ldb |
17 |
from samba import ldb, dsdb |
| 18 |
from samba.samdb import SamDB |
18 |
from samba.samdb import SamDB |
| 19 |
from samba.auth import AuthContext |
19 |
from samba.auth import AuthContext |
| 20 |
from samba.ndr import ndr_unpack |
20 |
from samba.ndr import ndr_unpack |
| 21 |
from samba import gensec |
21 |
from samba import gensec |
| 22 |
from samba.credentials import Credentials |
22 |
from samba.credentials import Credentials, DONT_USE_KERBEROS |
| 23 |
|
23 |
from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP |
| 24 |
import samba.tests |
24 |
import samba.tests |
|
|
25 |
from samba.tests import delete_force |
| 25 |
|
26 |
|
| 26 |
from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES |
27 |
from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES |
| 27 |
|
28 |
|
|
Lines 45-57
url = args[0]
Link Here
|
| 45 |
|
46 |
|
| 46 |
lp = sambaopts.get_loadparm() |
47 |
lp = sambaopts.get_loadparm() |
| 47 |
creds = credopts.get_credentials(lp) |
48 |
creds = credopts.get_credentials(lp) |
|
|
49 |
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) |
| 50 |
|
| 51 |
def closure(vSet, wSet, aSet): |
| 52 |
for edge in aSet: |
| 53 |
start, end = edge |
| 54 |
if start in wSet: |
| 55 |
if end not in wSet and end in vSet: |
| 56 |
wSet.add(end) |
| 57 |
closure(vSet, wSet, aSet) |
| 48 |
|
58 |
|
| 49 |
class TokenTest(samba.tests.TestCase): |
59 |
class StaticTokenTest(samba.tests.TestCase): |
| 50 |
|
60 |
|
| 51 |
def setUp(self): |
61 |
def setUp(self): |
| 52 |
super(TokenTest, self).setUp() |
62 |
super(StaticTokenTest, self).setUp() |
| 53 |
self.ldb = samdb |
63 |
self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) |
| 54 |
self.base_dn = samdb.domain_dn() |
64 |
self.base_dn = self.ldb.domain_dn() |
| 55 |
|
65 |
|
| 56 |
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
66 |
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
| 57 |
self.assertEquals(len(res), 1) |
67 |
self.assertEquals(len(res), 1) |
|
Lines 155-160
class TokenTest(samba.tests.TestCase):
Link Here
|
| 155 |
print("difference : %s" % sidset1.difference(sidset2)) |
165 |
print("difference : %s" % sidset1.difference(sidset2)) |
| 156 |
self.fail(msg="calculated groups don't match against user PAC tokenGroups") |
166 |
self.fail(msg="calculated groups don't match against user PAC tokenGroups") |
| 157 |
|
167 |
|
|
|
168 |
class DynamicTokenTest(samba.tests.TestCase): |
| 169 |
|
| 170 |
def get_creds(self, target_username, target_password): |
| 171 |
creds_tmp = Credentials() |
| 172 |
creds_tmp.set_username(target_username) |
| 173 |
creds_tmp.set_password(target_password) |
| 174 |
creds_tmp.set_domain(creds.get_domain()) |
| 175 |
creds_tmp.set_realm(creds.get_realm()) |
| 176 |
creds_tmp.set_workstation(creds.get_workstation()) |
| 177 |
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() |
| 178 |
| gensec.FEATURE_SEAL) |
| 179 |
return creds_tmp |
| 180 |
|
| 181 |
def get_ldb_connection(self, target_username, target_password): |
| 182 |
creds_tmp = self.get_creds(target_username, target_password) |
| 183 |
ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp) |
| 184 |
return ldb_target |
| 185 |
|
| 186 |
def setUp(self): |
| 187 |
super(DynamicTokenTest, self).setUp() |
| 188 |
self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) |
| 189 |
|
| 190 |
self.base_dn = self.admin_ldb.domain_dn() |
| 191 |
|
| 192 |
self.test_user = "tokengroups_user1" |
| 193 |
self.test_user_pass = "samba123@" |
| 194 |
self.admin_ldb.newuser(self.test_user, self.test_user_pass) |
| 195 |
self.test_group0 = "tokengroups_group0" |
| 196 |
self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP) |
| 197 |
res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn), |
| 198 |
attrs=["objectSid"], scope=ldb.SCOPE_BASE) |
| 199 |
self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) |
| 200 |
|
| 201 |
self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user], |
| 202 |
add_members_operation=True) |
| 203 |
|
| 204 |
self.test_group1 = "tokengroups_group1" |
| 205 |
self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP) |
| 206 |
res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn), |
| 207 |
attrs=["objectSid"], scope=ldb.SCOPE_BASE) |
| 208 |
self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) |
| 209 |
|
| 210 |
self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user], |
| 211 |
add_members_operation=True) |
| 212 |
|
| 213 |
self.test_group2 = "tokengroups_group2" |
| 214 |
self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP) |
| 215 |
|
| 216 |
res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn), |
| 217 |
attrs=["objectSid"], scope=ldb.SCOPE_BASE) |
| 218 |
self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0]) |
| 219 |
|
| 220 |
self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user], |
| 221 |
add_members_operation=True) |
| 222 |
|
| 223 |
self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass) |
| 224 |
|
| 225 |
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
| 226 |
self.assertEquals(len(res), 1) |
| 227 |
|
| 228 |
self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])) |
| 229 |
|
| 230 |
res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[]) |
| 231 |
self.assertEquals(len(res), 1) |
| 232 |
|
| 233 |
self.test_user_dn = res[0].dn |
| 234 |
|
| 235 |
session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS | |
| 236 |
AUTH_SESSION_INFO_AUTHENTICATED | |
| 237 |
AUTH_SESSION_INFO_SIMPLE_PRIVILEGES) |
| 238 |
session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn, |
| 239 |
session_info_flags=session_info_flags) |
| 240 |
|
| 241 |
token = session.security_token |
| 242 |
self.user_sids = [] |
| 243 |
for s in token.sids: |
| 244 |
self.user_sids.append(str(s)) |
| 245 |
|
| 246 |
def tearDown(self): |
| 247 |
super(DynamicTokenTest, self).tearDown() |
| 248 |
delete_force(self.admin_ldb, "CN=%s,%s,%s" % |
| 249 |
(self.test_user, "cn=users", self.base_dn)) |
| 250 |
delete_force(self.admin_ldb, "CN=%s,%s,%s" % |
| 251 |
(self.test_group0, "cn=users", self.base_dn)) |
| 252 |
delete_force(self.admin_ldb, "CN=%s,%s,%s" % |
| 253 |
(self.test_group1, "cn=users", self.base_dn)) |
| 254 |
delete_force(self.admin_ldb, "CN=%s,%s,%s" % |
| 255 |
(self.test_group2, "cn=users", self.base_dn)) |
| 256 |
|
| 257 |
def test_rootDSE_tokenGroups(self): |
| 258 |
"""Testing rootDSE tokengroups against internal calculation""" |
| 259 |
if not url.startswith("ldap"): |
| 260 |
self.fail(msg="This test is only valid on ldap") |
| 261 |
|
| 262 |
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
| 263 |
self.assertEquals(len(res), 1) |
| 264 |
|
| 265 |
print("Getting tokenGroups from rootDSE") |
| 266 |
tokengroups = [] |
| 267 |
for sid in res[0]['tokenGroups']: |
| 268 |
tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) |
| 269 |
|
| 270 |
sidset1 = set(tokengroups) |
| 271 |
sidset2 = set(self.user_sids) |
| 272 |
if len(sidset1.difference(sidset2)): |
| 273 |
print("token sids don't match") |
| 274 |
print("tokengroups: %s" % tokengroups) |
| 275 |
print("calculated : %s" % self.user_sids) |
| 276 |
print("difference : %s" % sidset1.difference(sidset2)) |
| 277 |
self.fail(msg="calculated groups don't match against rootDSE tokenGroups") |
| 278 |
|
| 279 |
def test_dn_tokenGroups(self): |
| 280 |
print("Getting tokenGroups from user DN") |
| 281 |
res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
| 282 |
self.assertEquals(len(res), 1) |
| 283 |
|
| 284 |
dn_tokengroups = [] |
| 285 |
for sid in res[0]['tokenGroups']: |
| 286 |
dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid))) |
| 287 |
|
| 288 |
sidset1 = set(dn_tokengroups) |
| 289 |
sidset2 = set(self.user_sids) |
| 290 |
if len(sidset1.difference(sidset2)): |
| 291 |
print("token sids don't match") |
| 292 |
print("difference : %s" % sidset1.difference(sidset2)) |
| 293 |
self.fail(msg="calculated groups don't match against user DN tokenGroups") |
| 294 |
|
| 295 |
def test_pac_groups(self): |
| 296 |
settings = {} |
| 297 |
settings["lp_ctx"] = lp |
| 298 |
settings["target_hostname"] = lp.get("netbios name") |
| 299 |
|
| 300 |
gensec_client = gensec.Security.start_client(settings) |
| 301 |
gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass)) |
| 302 |
gensec_client.want_feature(gensec.FEATURE_SEAL) |
| 303 |
gensec_client.start_mech_by_sasl_name("GSSAPI") |
| 304 |
|
| 305 |
auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[]) |
| 306 |
|
| 307 |
gensec_server = gensec.Security.start_server(settings, auth_context) |
| 308 |
machine_creds = Credentials() |
| 309 |
machine_creds.guess(lp) |
| 310 |
machine_creds.set_machine_account(lp) |
| 311 |
gensec_server.set_credentials(machine_creds) |
| 312 |
|
| 313 |
gensec_server.want_feature(gensec.FEATURE_SEAL) |
| 314 |
gensec_server.start_mech_by_sasl_name("GSSAPI") |
| 315 |
|
| 316 |
client_finished = False |
| 317 |
server_finished = False |
| 318 |
server_to_client = "" |
| 319 |
|
| 320 |
# Run the actual call loop. |
| 321 |
while client_finished == False and server_finished == False: |
| 322 |
if not client_finished: |
| 323 |
print "running client gensec_update" |
| 324 |
(client_finished, client_to_server) = gensec_client.update(server_to_client) |
| 325 |
if not server_finished: |
| 326 |
print "running server gensec_update" |
| 327 |
(server_finished, server_to_client) = gensec_server.update(client_to_server) |
| 328 |
|
| 329 |
session = gensec_server.session_info() |
| 330 |
|
| 331 |
token = session.security_token |
| 332 |
pac_sids = [] |
| 333 |
for s in token.sids: |
| 334 |
pac_sids.append(str(s)) |
| 335 |
|
| 336 |
sidset1 = set(pac_sids) |
| 337 |
sidset2 = set(self.user_sids) |
| 338 |
if len(sidset1.difference(sidset2)): |
| 339 |
print("token sids don't match") |
| 340 |
print("difference : %s" % sidset1.difference(sidset2)) |
| 341 |
self.fail(msg="calculated groups don't match against user PAC tokenGroups") |
| 342 |
|
| 343 |
|
| 344 |
def test_tokenGroups_manual(self): |
| 345 |
# Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 |
| 346 |
# and compare the result |
| 347 |
res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, |
| 348 |
expression="(|(objectclass=user)(objectclass=group))", |
| 349 |
attrs=["memberOf"]) |
| 350 |
aSet = set() |
| 351 |
aSetR = set() |
| 352 |
vSet = set() |
| 353 |
for obj in res: |
| 354 |
if "memberOf" in obj: |
| 355 |
for dn in obj["memberOf"]: |
| 356 |
first = obj.dn.get_casefold() |
| 357 |
second = ldb.Dn(self.admin_ldb, dn).get_casefold() |
| 358 |
aSet.add((first, second)) |
| 359 |
aSetR.add((second, first)) |
| 360 |
vSet.add(first) |
| 361 |
vSet.add(second) |
| 362 |
|
| 363 |
res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, |
| 364 |
expression="(objectclass=user)", |
| 365 |
attrs=["primaryGroupID"]) |
| 366 |
for obj in res: |
| 367 |
if "primaryGroupID" in obj: |
| 368 |
sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) |
| 369 |
res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, |
| 370 |
attrs=[]) |
| 371 |
first = obj.dn.get_casefold() |
| 372 |
second = res2[0].dn.get_casefold() |
| 373 |
|
| 374 |
aSet.add((first, second)) |
| 375 |
aSetR.add((second, first)) |
| 376 |
vSet.add(first) |
| 377 |
vSet.add(second) |
| 378 |
|
| 379 |
wSet = set() |
| 380 |
wSet.add(self.test_user_dn.get_casefold()) |
| 381 |
closure(vSet, wSet, aSet) |
| 382 |
wSet.remove(self.test_user_dn.get_casefold()) |
| 383 |
|
| 384 |
tokenGroupsSet = set() |
| 385 |
|
| 386 |
res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"]) |
| 387 |
self.assertEquals(len(res), 1) |
| 388 |
|
| 389 |
dn_tokengroups = [] |
| 390 |
for sid in res[0]['tokenGroups']: |
| 391 |
sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) |
| 392 |
res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, |
| 393 |
attrs=[]) |
| 394 |
tokenGroupsSet.add(res3[0].dn.get_casefold()) |
| 395 |
|
| 396 |
if len(wSet.difference(tokenGroupsSet)): |
| 397 |
self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet)) |
| 398 |
|
| 399 |
if len(tokenGroupsSet.difference(wSet)): |
| 400 |
self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet)) |
| 401 |
|
| 402 |
|
| 403 |
def filtered_closure(self, wSet, filter_grouptype): |
| 404 |
res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, |
| 405 |
expression="(|(objectclass=user)(objectclass=group))", |
| 406 |
attrs=["memberOf"]) |
| 407 |
aSet = set() |
| 408 |
aSetR = set() |
| 409 |
vSet = set() |
| 410 |
for obj in res: |
| 411 |
vSet.add(obj.dn.get_casefold()) |
| 412 |
if "memberOf" in obj: |
| 413 |
for dn in obj["memberOf"]: |
| 414 |
first = obj.dn.get_casefold() |
| 415 |
second = ldb.Dn(self.admin_ldb, dn).get_casefold() |
| 416 |
aSet.add((first, second)) |
| 417 |
aSetR.add((second, first)) |
| 418 |
vSet.add(first) |
| 419 |
vSet.add(second) |
| 420 |
|
| 421 |
res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE, |
| 422 |
expression="(objectclass=user)", |
| 423 |
attrs=["primaryGroupID"]) |
| 424 |
for obj in res: |
| 425 |
if "primaryGroupID" in obj: |
| 426 |
sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0])) |
| 427 |
res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, |
| 428 |
attrs=[]) |
| 429 |
first = obj.dn.get_casefold() |
| 430 |
second = res2[0].dn.get_casefold() |
| 431 |
|
| 432 |
aSet.add((first, second)) |
| 433 |
aSetR.add((second, first)) |
| 434 |
vSet.add(first) |
| 435 |
vSet.add(second) |
| 436 |
|
| 437 |
uSet = set() |
| 438 |
for v in vSet: |
| 439 |
res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE, |
| 440 |
attrs=["groupType"], |
| 441 |
expression="objectClass=group") |
| 442 |
if len(res_group) == 1: |
| 443 |
if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype): |
| 444 |
uSet.add(v) |
| 445 |
else: |
| 446 |
uSet.add(v) |
| 447 |
|
| 448 |
closure(uSet, wSet, aSet) |
| 449 |
|
| 450 |
|
| 451 |
def test_tokenGroupsGlobalAndUniversal_manual(self): |
| 452 |
# Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3 |
| 453 |
# and compare the result |
| 454 |
|
| 455 |
# The variable names come from MS-ADTS May 15, 2014 |
| 456 |
|
| 457 |
S = set() |
| 458 |
S.add(self.test_user_dn.get_casefold()) |
| 459 |
|
| 460 |
self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP) |
| 461 |
|
| 462 |
T = set() |
| 463 |
# Not really a SID, we do this on DNs... |
| 464 |
for sid in S: |
| 465 |
X = set() |
| 466 |
X.add(sid) |
| 467 |
self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP) |
| 468 |
|
| 469 |
T = T.union(X) |
| 470 |
|
| 471 |
T.remove(self.test_user_dn.get_casefold()) |
| 472 |
|
| 473 |
tokenGroupsSet = set() |
| 474 |
|
| 475 |
res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"]) |
| 476 |
self.assertEquals(len(res), 1) |
| 477 |
|
| 478 |
dn_tokengroups = [] |
| 479 |
for sid in res[0]['tokenGroupsGlobalAndUniversal']: |
| 480 |
sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid) |
| 481 |
res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE, |
| 482 |
attrs=[]) |
| 483 |
tokenGroupsSet.add(res3[0].dn.get_casefold()) |
| 484 |
|
| 485 |
if len(T.difference(tokenGroupsSet)): |
| 486 |
self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet)) |
| 487 |
|
| 488 |
if len(tokenGroupsSet.difference(T)): |
| 489 |
self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T)) |
| 158 |
|
490 |
|
| 159 |
if not "://" in url: |
491 |
if not "://" in url: |
| 160 |
if os.path.isfile(url): |
492 |
if os.path.isfile(url): |
|
Lines 162-167
if not "://" in url:
Link Here
|
| 162 |
else: |
494 |
else: |
| 163 |
url = "ldap://%s" % url |
495 |
url = "ldap://%s" % url |
| 164 |
|
496 |
|
| 165 |
samdb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp) |
|
|
| 166 |
|
| 167 |
TestProgram(module=__name__, opts=subunitopts) |
497 |
TestProgram(module=__name__, opts=subunitopts) |
| 168 |
- |
|
|