Univention Bugzilla – Attachment 8840 Details for
Bug 42531
Check reason of failure of user access to files in samba share
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
Check file access per SMB and direct FS acces and try to find the problem cause.
check-smb-access (text/x-python), 28.35 KB, created by
Lukas Oyen
on 2017-05-11 15:28:54 CEST
(
hide
)
Description:
Check file access per SMB and direct FS acces and try to find the problem cause.
Filename:
MIME Type:
Creator:
Lukas Oyen
Created:
2017-05-11 15:28:54 CEST
Size:
28.35 KB
patch
obsolete
>#!/usr/bin/env python ># coding: utf-8 ># ># Univention Samba4 ># Check access problems on samba shares. ># ># Copyright 2017 Univention GmbH ># ># http://www.univention.de/ ># ># All rights reserved. ># ># The source code of this program is made available ># under the terms of the GNU Affero General Public License version 3 ># (GNU AGPL V3) as published by the Free Software Foundation. ># ># Binary versions of this program provided by Univention to you as ># well as other copyrighted, protected or trademarked materials like ># Logos, graphics, fonts, specific documentations and configurations, ># cryptographic keys etc. are subject to a license agreement between ># you and Univention and not subject to the GNU AGPL V3. ># ># In the case you use this program under the terms of the GNU AGPL V3, ># the program is provided in the hope that it will be useful, ># but WITHOUT ANY WARRANTY; without even the implied warranty of ># MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ># GNU Affero General Public License for more details. ># ># You should have received a copy of the GNU Affero General Public ># License with the Debian GNU/Linux or Univention distribution in file ># /usr/share/common-licenses/AGPL-3; if not, see ># <http://www.gnu.org/licenses/>. >from __future__ import print_function > >import os >import sys >import pwd >import grp >import stat >import errno >import getpass >import urlparse >import argparse >import subprocess >import contextlib >import multiprocessing >import Queue as queue > >import samba.smb >import samba.param >import samba.credentials >import samba.samba3.passdb >import samba.dcerpc.security as security > >import univention.lib.s4 >import univention.config_registry > >try: > import posix1e >except ImportError: > print("Failed to import `posix1e`, disabling posix ACL checks.", file=sys.stderr) > posix1e = None > >ucr = univention.config_registry.ConfigRegistry() >ucr.load() > > >SMB_ACCESS_FILE_READ_DATA = 0x00000001 >SMB_ACCESS_FILE_WRITE_DATA = 0x00000002 >SMB_ACCESS_FILE_APPEND_DATA = 0x00000004 >SMB_ACCESS_FILE_EXECUTE = 0x00000020 >SMB_ACCESS_DELETE = 0x00010000 > > >class Credentials(object): > def __init__(self, user_name=None, password=None, load_param=None, credentials=None): > self.user_name = user_name > self.password = password > self.load_param = load_param > self.credentials = credentials > > @classmethod > def from_user_name(cls, user_name, password): > load_param = get_samba_params() > credentials = samba.credentials.Credentials() > credentials.parse_string(user_name) > credentials.set_password(password) > credentials.guess(load_param) > return cls(user_name, password, load_param, credentials) > > >class ShareLookupError(LookupError): > def __init__(self, share, message=None): > msg = "Share {!r} not found.".format(share) > super(ShareLookupError, self).__init__(message or msg) > self.share = share > > >class CheckError(Exception): > pass > > >class SMBError(EnvironmentError, CheckError): > def __init__(self, nt_status_error, message=None): > msg = "SMB error: {!r}".format(nt_status_error) > super(SMBError, self).__init__(message or msg) > self.nt_status_error = nt_status_error > > >class SMBConnectError(SMBError): > def __init__(self, host, share, nt_status_error, message=None): > path = "//" + os.path.join(host, share) > msg = "SMB connection to {!r} failed: {!r}".format(path, nt_status_error) > super(SMBConnectError, self).__init__(nt_status_error, message or msg) > self.host = host > self.share = share > > >class SMBAccessError(SMBConnectError): > def __init__(self, host, share, path, permissions, nt_status_error, message=None): > msg = "SMB access to {!r} failed: {!r}".format(path, nt_status_error) > super(SMBAccessError, self).__init__(host, share, nt_status_error, message or msg) > self.path = path > self.permissions = permissions > > >class FileSystemAccessError(EnvironmentError, CheckError): > def __init__(self, errno, filename, permissions): > strerror = os.strerror(errno) > super(FileSystemAccessError, self).__init__(errno, strerror, filename) > self.file_path = filename > self.permissions = permissions > > @classmethod > def from_error(cls, error, permissions): > return cls(error.errno, error.filename, permissions) > > >class FileAccessError(FileSystemAccessError): > pass > > >class DirectoryAccessError(FileSystemAccessError): > pass > > >class CheckResult(object): > def __init__(self, description, error=None): > self.description = description > self.error = error > > def __str__(self): > return "{} - {}".format(self.description, self.error or "OK") > > @classmethod > @contextlib.contextmanager > def catch_check_error(cls, description): > result = cls(description) > try: > yield result > except CheckError as error: > result.error = error > except: > raise > > def add_description(self, description): > self.description += description > > def is_success(self): > return self.error is None > > def is_error(self): > return not self.is_success() > > >def get_user_name(uid=None): > """ > Get the user name of the current user, or the user with the given `uid`. > """ > return pwd.getpwuid(uid if uid is not None else os.getuid()).pw_name > > >def resolve_smb_path_to_local_path(share_path, user_name=None): > """ > Resolve a SMB URI to the local file path. This only works if the > `share_path` points to the current host and a corresponding share is setup. > > If `user_name` is given, the share lookup also considers the users home > directory (if smb.conf has a `homes` section). > """ > (host_name, share_name, file_path) = split_share_path(share_path) > if host_name not in ("localhost", "127.0.0.1", "::1", get_samba_host()): > raise ValueError("Can only resolve local SMB paths.") > shares = find_samba_shares(share_name=share_name, user_name=user_name) > share = next((share for share in shares), dict()) > if share: > return os.path.join(share.get("path"), file_path) > raise ShareLookupError(share_name) > > >def split_share_path(share_path): > (_scheme, location, path, _query, _fragment) = urlparse.urlsplit(share_path) > path_parts = path.split("/", 2)[1:] > if len(path_parts) == 2: > (share_name, smb_path) = path_parts > elif len(path_parts) == 1: > (share_name, smb_path) = (path_parts[0], "") > else: > (share_name, smb_path) = ("", "") > return (location, share_name, smb_path) > > >def get_samba_host(): > return "{}.{}".format(ucr.get("hostname"), ucr.get("domainname")) > > >def find_samba_shares(share_name=None, user_name=None, options=()): > """ > Find all samba shares configured in `smb.conf`. If `share_name` is given, > only the corresponding share will be yielded. If `user_name` is given and > the `smb.conf` has a `homes` section, the user's home is also considered as > a share. > > This will yield a dictionary per share with the keys `name` (section/share > name) and `path`. Additional configuration keys from the `smb.conf` share > section can be requested by the tuple `options`. > """ > load_param = get_samba_params() > > for service in load_param.services(): > name = service > path = load_param.get("path", service) > > if service == "homes" and user_name is not None: > if path: > raise RuntimeError("smb.conf variable substitution not implemented") > name = user_name > path = pwd.getpwnam(user_name).pw_dir > > if path and not load_param.get("printable", service): > if share_name is not None and share_name != name: > continue > share = {"name": name, "path": path} > share.update((opt, load_param.get(opt, service)) for opt in options) > yield share > > >def get_samba_params(): > load_param = samba.param.LoadParm() > load_param.set("debug level", "0") > if os.getenv("SMB_CONF_PATH") is not None: > load_param.load(os.getenv("SMB_CONF_PATH")) > else: > load_param.load_default() > return load_param > > >def print_until_success(check_generator, verbose=False): > error_generator = continue_while_error(check_generator) > first = next(error_generator, None) > > if first is not None and first.is_error(): > last_error = first.error > print(first.description, "failed") > if verbose: > print(" ", first.error) > for result in error_generator: > if result.is_error(): > last_error = result.error > print(" ", "because", result.description, "failed") > if verbose: > print(" ", result.error) > else: > print(" ", "but", result.description, "succeded") > return last_error > if first.is_success(): > print(first.description, "succeded") > return None > > >def continue_while_error(check_generator): > """ > Yield `CheckResult`s from the given generator, als long as they are > error-results. The first successful result is yielded as well. > """ > check_error = True > while check_error: > result = next(check_generator) > yield result > check_error = result.is_error() > > >def find_shares_from_file(file_path, user_name=None, options=()): > """ > Find local shares that contain the file `file_path`. If `user_name` is > given and the `smb.conf` has a `homes` section, the user's home directory > is regarded as a share. For `options` see `find_samba_shares()`. > """ > for share in find_samba_shares(user_name=user_name, options=options): > for path in generate_path_parents(file_path, yield_argument=True): > if path == os.path.normpath(share.get("path")): > yield share > > >def generate_path_parents(file_path, yield_argument=False): > """ > Generate all parent directories for the given path. If `yield_argument` is > given, `file_path` is yielded as a first value. > """ > path = os.path.normpath(file_path) > > if yield_argument: > yield path > > while True: > path = os.path.dirname(os.path.normpath(path)) > if path in ("", "/", "//"): > break > yield path > > >def smb_connection(credentials, host_name, share_name): > """ > Create a `samba.smb.SMB` connection from a `Credentials` instance. > """ > try: > return samba.smb.SMB(host_name, share_name, lp=credentials.load_param, > creds=credentials.credentials) > except samba.NTSTATUSError as error: > raise SMBConnectError(host_name, share_name, error) > > >def uid_to_sid(uid): > output = subprocess.check_output(["wbinfo", "-U", str(uid)]) > return security.dom_sid(output.strip()) > > >def gid_to_sid(gid): > output = subprocess.check_output(["wbinfo", "-G", str(gid)]) > return security.dom_sid(output.strip()) > > >def uid_exists(uid): > try: > pwd.getpwuid(uid) > except KeyError: > return False > return True > > >def gid_exists(gid): > try: > grp.getgrgid(gid) > except KeyError: > return False > return True > > >def sid_exists(sid): > sid_str = str(sid) > universal_well_known = (security.SID_NULL, security.SID_WORLD, > security.SID_CREATOR_OWNER, security.SID_CREATOR_GROUP) > unmapped_root = ("S-1-22-1-0", "S-1-22-2-0") > known_sid = sid_str in univention.lib.s4.well_known_sids or \ > sid_str in universal_well_known or sid_str in unmapped_root > if known_sid: > return True > > wbinfo = subprocess.Popen(["wbinfo", "-s", str(sid)], > stdout=subprocess.PIPE, stderr=subprocess.PIPE) > wbinfo.communicate() > return wbinfo.poll() == 0 > > >def run_as_user(user_name, generator, *args, **kwargs): > """ > Run a generator function as the user `user_name`. This will fork a new > process and `setuid()` to the given user. All values generated by > `generator(*args, **kwargs)` will be yielded by this generator. > """ > if user_name == get_user_name(): > for value in generator(*args, **kwargs): > yield value > return > > def wrapper(generator_queue): > os.setuid(pwd.getpwnam(user_name).pw_uid) > for value in generator(*args, **kwargs): > generator_queue.put(value) > > generator_queue = multiprocessing.Queue() > process = multiprocessing.Process(target=wrapper, args=(generator_queue,)) > > process.start() > while process.is_alive(): > try: > yield generator_queue.get(True, 1) > except queue.Empty: > pass > process.join() > > while True: > try: > yield generator_queue.get_nowait() > except queue.Empty: > break > > >def get_gids(user_name): > yield pwd.getpwnam(user_name).pw_gid > for group in grp.getgrall(): > if user_name in group.gr_mem: > yield group.gr_gid > > >def is_smb_path(path): > return path.startswith("//") > > >def run_checks(arguments, file_path): > if not arguments.no_smb_client: > run_smb_checks(arguments, file_path) > if not arguments.no_filesystem: > run_fs_checks(arguments, file_path) > if not arguments.no_share_configuration: > user_name = arguments.user > for result in check_share_configuration(user_name, file_path): > print(result.description, "-", "OK" if result.is_success() else "ERROR") > if arguments.verbose and result.is_error(): > print(" ", result.error) > > >def run_smb_checks(arguments, file_path): > user_name = arguments.user > password = arguments.password > privileded_user = arguments.privileged_user > privileded_password = arguments.privileged_password > permissions = arguments.permissions > > credentials = Credentials.from_user_name(user_name, password) > smb_checks = check_smb_connect_and_access(credentials, file_path, permissions) > last_error = print_until_success(smb_checks, arguments.verbose) > > if not arguments.no_nt_acl and last_error and isinstance(last_error, SMBAccessError): > credentials = Credentials.from_user_name(privileded_user, privileded_password) > (host, share, path) = (last_error.host, last_error.share, last_error.path) > print(" ", "Possible problems:") > for permission in last_error.permissions: > access_problem = find_smb_access_problem(user_name, credentials, > host, share, path, permission) > if access_problem: > print(" -", access_problem) > acl_integrity = check_nt_acl_integrity(credentials, host, share, path) > print(" -", "ACL integrity", acl_integrity or "OK") > print() > > >def run_fs_checks(arguments, file_path): > user_name = arguments.user > permissions = arguments.permissions > > fs_checks = check_filesystem_access(user_name, file_path, permissions) > last_error = print_until_success(fs_checks, arguments.verbose) > if not arguments.no_posix_acl and last_error: > print(" ", "Possible problems:") > for permission in last_error.permissions: > access_problem = find_fs_access_problem(user_name, last_error.file_path, permission) > if access_problem: > print(" -", access_problem) > acl_integrity = check_posix_acl_integrity(last_error.file_path) > print(" -", "ACL integrity", acl_integrity or "OK") > print() > > >def check_smb_connect_and_access(credentials, file_path, permissions=None): > """ > Check if the user indentified and authorized by `credentials` can access > `file_path` with the given permissions. On access error, or if no > permissions are given, a connection test is performed. `permissions` may be > a string of "rwx" or any of the combinations or None. > """ > host_name = get_samba_host() > > for share in find_shares_from_file(file_path, user_name=credentials.user_name): > if permissions is not None: > relative_path = file_path.replace(share.get("path"), "").lstrip("/") > yield do_smb_access_check(credentials, host_name, > share.get("name"), relative_path, permissions) > for parent_path in generate_path_parents(relative_path): > yield do_smb_access_check(credentials, host_name, > share.get("name"), parent_path, "x") > > yield do_smb_connect(credentials, host_name, share.get("name")) > yield do_smb_connect(credentials, host_name, "sysvol") > > >def do_smb_access_check(credentials, host_name, share_name, smb_path, permissions): > path = "//" + os.path.join(host_name, share_name, smb_path) > description = "accessing {!r} with permissions {}".format(path, permissions) > > with CheckResult.catch_check_error(description) as result: > smb = smb_connection(credentials, host_name, share_name) > > access_mask = 0 > if "r" in permissions: > access_mask |= SMB_ACCESS_FILE_READ_DATA > if "w" in permissions: > access_mask |= SMB_ACCESS_FILE_WRITE_DATA > access_mask |= SMB_ACCESS_FILE_APPEND_DATA > access_mask |= SMB_ACCESS_DELETE > if "x" in permissions: > # For files, read access is also needed for execute permissions. > # Also see bug #33785 (smb.conf `acl allow execute always = True`). > access_mask |= SMB_ACCESS_FILE_EXECUTE > > try: > fnum = smb.open_file(smb_path or "/", access_mask) > except samba.NTSTATUSError as error: > if "x" in permissions and "r" not in permissions: > result.add_description(" (note, that for x, r is needed)") > raise SMBAccessError(host_name, share_name, smb_path, permissions, error) > else: > smb.close_file(fnum) > return result > > >def do_smb_connect(credentials, host_name, share_name): > description = "connecting to {!r}".format("//" + os.path.join(host_name, share_name)) > with CheckResult.catch_check_error(description) as result: > smb_connection(credentials, host_name, share_name) > return result > > >def find_smb_access_problem(user_name, credentials, host_name, share_name, smb_path, permission): > """ > Find the cause of a SMB access to a path for the user `user_name` by > inspecting the NT ACLs for the file //<host_name>/<share_name>/<smb_path>. > This needs to acquire the NT ACLs for `smb_path`, so `credentials` must be > a sufficient privileged user. > """ > path = "//" + os.path.join(host_name, share_name, smb_path) > client = smb_connection(credentials, host_name, share_name) > > gids = set(get_gids(user_name)) > user_sid = uid_to_sid(pwd.getpwnam(user_name).pw_uid) > group_sids = (security.dom_sid(security.SID_WORLD),) + tuple(gid_to_sid(gid) for gid in gids) > > selection = security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL > try: > acl = client.get_acl(smb_path, selection, security.SEC_FLAG_MAXIMUM_ALLOWED) > except samba.NTSTATUSError as error: > (_code, message) = error.args > return "No {} access because file is not accessible: {}".format(permission, message) > > if acl.dacl is not None and not acl.dacl.aces: > return "No access because of emtpy DACL to {!r}".format(path) > > if "r" == permission: > access_mask = security.SEC_DIR_LIST | security.SEC_FILE_READ_DATA > elif "w" in permission: > access_mask = security.SEC_DIR_ADD_FILE | security.SEC_DIR_ADD_SUBDIR > access_mask |= security.SEC_FILE_WRITE_DATA | security.SEC_FILE_APPEND_DATA > elif "x" in permission: > access_mask = security.SEC_DIR_TRAVERSE | security.SEC_FILE_EXECUTE > > trustee_map = {security.SID_CREATOR_OWNER: acl.owner_sid, > security.SID_CREATOR_GROUP: acl.group_sid} > > for ace in acl.dacl.aces: > trustee = trustee_map.get(str(ace.trustee), ace.trustee) > if user_sid == trustee or trustee in group_sids: > if ace.type == security.SEC_ACE_TYPE_ACCESS_ALLOWED: > if ace.access_mask == 0: # see `map_canon_ace_perms()` in samba source > msg = "All access explicitly denied for SID {} for {!r}" > return msg.format(trustee, path) > elif ace.access_mask & access_mask: > return "" > elif ace.type == security.SEC_ACE_TYPE_ACCESS_DENIED: > if ace.access_mask & access_mask: > msg = "Access ({}) explicitly denied for SID {} for {!r}" > return msg.format(permission, trustee, path) > return "Access ({}) not explicitly allowed, therefore denied for {}".format(permission, path) > > >def check_nt_acl_integrity(credentials, host_name, share_name, smb_path): > """ > Check the NT ACLs for the file //<host_name>/<share_name>/<smb_path> for > unknown SIDs. This needs to acquire the NT ACLs for `smb_path`, so > `credentials` must be a sufficient privileged user. > """ > client = smb_connection(credentials, host_name, share_name) > selection = security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL > try: > acl = client.get_acl(smb_path, selection, security.SEC_FLAG_MAXIMUM_ALLOWED) > except samba.NTSTATUSError: > pass > else: > acl_sids = (ace.trustee for ace in acl.dacl.aces) > missing_sids = set(str(sid) for sid in acl_sids if not sid_exists(sid)) > if not sid_exists(acl.owner_sid): > missing_sids.add(str(acl.owner_sid)) > if not sid_exists(acl.group_sid): > missing_sids.add(str(acl.group_sid)) > > if missing_sids: > path = "//" + os.path.join(host_name, share_name, smb_path) > sid_error = ", ".join(missing_sids) > return "ACL for {!r} contains SIDs that do not exist: {}".format(path, sid_error) > return "" > > >def check_filesystem_access(user_name, file_path, permissions): > """ > Check normal file system access to `file_path` as the user `user_name`. > This will `setuid()` to the given user and will fail that is impossible. > """ > def generator(): > yield do_filesystem_access_check(file_path, permissions) > for parent_path in generate_path_parents(file_path): > yield do_filesystem_access_check(parent_path, "x") > for value in run_as_user(user_name, generator): > yield value > > >def do_filesystem_access_check(file_path, permissions): > description = "accessing path {} with permissions {}" > description = description.format(file_path, permissions) > > with CheckResult.catch_check_error(description) as result: > try: > file_stat = os.stat(file_path) > except OSError as error: > raise FileSystemAccessError.from_error(error, permissions) > > if stat.S_ISDIR(file_stat.st_mode): > do_filesystem_dir_check(file_path, permissions) > else: > do_filesystem_file_check(file_path, permissions) > return result > > >def do_filesystem_dir_check(file_path, permissions): > access_mode = os.F_OK > if "r" in permissions: > access_mode |= os.R_OK > if "w" in permissions: > access_mode |= os.W_OK > if "x" in permissions: > access_mode |= os.X_OK > > if os.access(file_path, access_mode): > if "r" in permissions: > try: > os.listdir(file_path) > except OSError as error: > raise DirectoryAccessError.from_error(error, permissions) > return True > raise DirectoryAccessError(errno.EACCES, file_path, permissions) > > >def do_filesystem_file_check(file_path, permissions): > try: > fob = open(file_path, permissions.replace("r", "a"), len(permissions)) > except IOError as error: > raise FileAccessError.from_error(error, permissions) > else: > fob.close() > return True > > >def find_fs_access_problem(user_name, file_path, permission): > """ > Find access problems to file `file_path` for the user `user_name` based on > the posix ACLs for the file. > """ > def get_entry(acl, tag_type): > for entry in acl: > if entry.tag_type == tag_type: > yield entry > > def test_permission(acl_entry): > return acl_entry is not None and \ > ("r" == permission and acl_entry.permset.read) or \ > ("w" == permission and acl_entry.permset.write) or \ > ("x" == permission and acl_entry.permset.execute) > > def denied_reason(acl_entry, tag_type_name): > if acl_entry is None: > return "No {} entry in ACL for {}".format(tag_type_name, file_path) > msg = "No {} permission per {} for {}" > perm_map = {"r": "read", "w": "write", "x": "execute"} > return msg.format(perm_map.get(permission), tag_type_name, file_path) > > try: > file_stat = os.stat(file_path) > except OSError as error: > return "Can not stat file {}: {}".format(file_path, error) > > try: > acl = posix1e.ACL(file=file_path) > except OSError as error: > return "Can not get acl of {}: {}".format(file_path, error) > > uid = pwd.getpwnam(user_name).pw_uid > gids = set(get_gids(user_name)) > > if uid == file_stat.st_uid: > acl_user = next(get_entry(acl, posix1e.ACL_USER_OBJ), None) > if not test_permission(acl_user): > return denied_reason(acl_user, "ACL_USER_OBJ") > > acl_mask = next(get_entry(acl, posix1e.ACL_MASK), None) > matching_acl = (e for e in get_entry(acl, posix1e.ACL_USER) if e.qualifier == uid) > acl_user = next(matching_acl, None) > if acl_user and acl_mask is None: > return "No ACL_MASK on ACL with ACL_USER for {}".format(file_path) > elif acl_user and not test_permission(acl_user): > return denied_reason(acl_user, "ACL_USER") > elif acl_user and not test_permission(acl_mask): > return denied_reason(acl_user, "ACL_MASK") > > acl_group = next(get_entry(acl, posix1e.ACL_GROUP_OBJ), None) > matching_acl_groups = list(e for e in get_entry(acl, posix1e.ACL_GROUP) > if e.qualifier in gids) > if file_stat.st_gid in gids or matching_acl_groups: > if matching_acl_groups and acl_mask is None: > return denied_reason(acl_mask, "ACL_MASK") > elif acl_mask: > grant_access = test_permission(acl_group) > for group in matching_acl_groups: > grant_access |= test_permission(group) > if not grant_access: > return denied_reason(acl_group, "ACL_GROUP/ACL_GROUP_OBJ") > elif not test_permission(acl_mask): > return denied_reason(acl_mask, "ACL_MASK") > elif not test_permission(acl_group): > return denied_reason(acl_group, "ACL_GROUP_OBJ") > > acl_other = next(get_entry(acl, posix1e.ACL_OTHER), None) > if not test_permission(acl_other): > return "No {} access per access rights or ACL rule to {}".format(permission, file_path) > return "" > > >def check_posix_acl_integrity(file_path): > """ > Check the posix ACLs for `file_path` for non existing UIDs/GIDs. > """ > try: > file_stat = os.stat(file_path) > except OSError as error: > return "Can not stat file {}: {}".format(file_path, error) > > try: > acl = posix1e.ACL(file=file_path) > except OSError as error: > return "Can not get acl of {}: {}".format(file_path, error) > > missing_uids = set() > missing_gids = set() > > if not uid_exists(file_stat.st_uid): > missing_uids.add(file_stat.st_uid) > if not gid_exists(file_stat.st_gid): > missing_gids.add(file_stat.st_gid) > for entry in acl: > if entry.tag_type == posix1e.ACL_USER and not uid_exists(entry.qualifier): > missing_uids.add(entry.qualifier) > if entry.tag_type == posix1e.ACL_GROUP and not gid_exists(entry.qualifier): > missing_gids.add(entry.qualifier) > > if missing_uids or missing_gids: > error = "ACL for {} contains UIDs/GIDs that do not exist:".format(file_path) > if missing_uids: > error += " uids: {}".format(", ".join(missing_uids)) > if missing_gids: > error += " gids: {}".format(", ".join(missing_gids)) > return "" > > >def check_share_configuration(user_name, file_path): > nasty_options = { > "access based share enum": False, > "available": True, > "create mask": 0744, > "directory mask": 0755, > "dont descent": None, > "force create mode": 0, > "force directory mode": 0, > "force group": "", > "force unknown acl user": False, > "force user": "", > "hosts deny": [], > "invalid users": [], > "max connections": 0, > "read list": [], > "valid users": [], > } > shares = list(find_shares_from_file(file_path, user_name=user_name, > options=nasty_options.keys())) > > with CheckResult.catch_check_error("checking for nested shares") as result: > if len(shares) > 1: > share_names = ", ".join(share.get("name") for share in shares) > raise CheckError("More than one share for {}: {}".format(file_path, share_names)) > yield result > > for share in shares: > description = "checking share {} for nasty options".format(share.get("name")) > with CheckResult.catch_check_error(description) as result: > nasty = list(option for (option, default) in nasty_options.items() > if share.get(option, default) != default) > if nasty: > nasty_options_str = ", ".join(nasty) > raise CheckError("Possibly nasty options set: {}".format(nasty_options_str)) > yield result > > >def parse_arguments(): > default_user = get_user_name() > parser = argparse.ArgumentParser() > > parser.add_argument("file_path", nargs="+", help="File(s) to check access for.") > parser.add_argument("--user", default=default_user, > help="Run checks as user (default: {}).".format(default_user)) > parser.add_argument("--password", help="User password.") > parser.add_argument("--privileged-user", > help="Privileged user for NT ACL checks (required if --no-nt-acl is not given).") > parser.add_argument("--privileged-password", > help="Privileged user password for NT ACL checks.") > parser.add_argument("--force-local-path", action="store_true", > help="`file_path` a local file path (don't detect SMB path).") > parser.add_argument("--permissions", default="rwx", > help="Check for given permissions (default 'rwx').") > > parser.add_argument("--no-smb-client", action="store_true", > help="Don't check smb client access (implies --no-nt-acl).") > parser.add_argument("--no-nt-acl", action="store_true", > help="Don't check NT ACL access.") > parser.add_argument("--no-filesystem", action="store_true", > help="Don't check basic filesystem access..") > parser.add_argument("--no-posix-acl", action="store_true", > help="Don't check posix ACL access.") > parser.add_argument("--no-share-configuration", action="store_true", > help="Don't check samba share configuration") > parser.add_argument("--verbose", action="store_true", > help="Don't suppress error messages.") > > args = parser.parse_args() > > if args.no_smb_client: > args.no_nt_client = True > if posix1e is None: > args.no_posix_acl = True > > if not args.no_nt_acl and args.privileged_user is None: > parser.error("--privileged-user required, if --no-nt-acl is not given") > if args.user != default_user and os.getuid() != 0: > parser.error("--user can only be given with root permissions") > > if not args.no_smb_client and args.password is None: > args.password = getpass.getpass("Password for {}: ".format(args.user)) > if not args.no_nt_acl and args.privileged_password is None: > args.privileged_password = getpass.getpass("Password for {}: ".format(args.privileged_user)) > > return args > > >def main(): > args = parse_arguments() > for path in args.file_path: > if is_smb_path(path) and not args.force_local_path: > try: > path = resolve_smb_path_to_local_path(path, user_name=args.user) > except ShareLookupError as error: > exit(error.message) > run_checks(args, os.path.abspath(path)) > > >if __name__ == "__main__": > main()
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 42531
:
8817
|
8838
|
8839
| 8840