Univention Bugzilla – Attachment 7894 Details for
Bug 36131
UMC-Server: rewrite decorators
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
[patch]
patch (relative to svn r71715)
decorators.py (text/plain), 22.27 KB, created by
Florian Best
on 2016-08-18 16:27 CEST
(
hide
)
Description:
patch (relative to svn r71715)
Filename:
MIME Type:
Creator:
Florian Best
Created:
2016-08-18 16:27 CEST
Size:
22.27 KB
patch
obsolete
>#!/usr/bin/python2.7 ># -*- coding: utf-8 -*- ># ># Univention Management Console ># Decorators for functions in UMC 2.0 modules ># ># Copyright 2012-2016 Univention GmbH ># ># http://www.univention.de/ ># ># All rights reserved. ># ># The source code of this program is made available ># under the terms of the GNU Affero General Public License version 3 ># (GNU AGPL V3) as published by the Free Software Foundation. ># ># Binary versions of this program provided by Univention to you as ># well as other copyrighted, protected or trademarked materials like ># Logos, graphics, fonts, specific documentations and configurations, ># cryptographic keys etc. are subject to a license agreement between ># you and Univention and not subject to the GNU AGPL V3. ># ># In the case you use this program under the terms of the GNU AGPL V3, ># the program is provided in the hope that it will be useful, ># but WITHOUT ANY WARRANTY; without even the implied warranty of ># MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ># GNU Affero General Public License for more details. ># ># You should have received a copy of the GNU Affero General Public ># License with the Debian GNU/Linux or Univention distribution in file ># /usr/share/common-licenses/AGPL-3; if not, see ># <http://www.gnu.org/licenses/>. > >""" >Convenience decorators for developers of UMC modules >==================================================== > >Functions exposed by UMC modules often share some logic. They check the >existance and formatting of variables or check permissions. If anything >fails, they react in a similar way. If everything is correct, the real >logic is often as simple as returning one single value. > >This module provides functions that can be used to separate repeating >tasks from the actual business logic. This means: > > * less time to code > * fewer bugs > * consistent behaviour throughout the UMC in standard cases > >Note that the functions defined herein do not cover every corner case during >UMC module development. You are not bound to use them if you need more >flexibility. >""" > >import sys >import inspect >import types >import traceback >import notifier >import notifier.threads >from threading import Thread >from functools import partial > >from univention.lib.i18n import Translation >_ = Translation('univention.management.console').translate > >from univention.management.console.modules import UMC_OptionTypeError, UMC_OptionMissing, UMC_CommandError, UMC_OptionSanitizeError, error_handling >from univention.management.console.log import MODULE > >from univention.management.console.modules.sanitizers import MultiValidationError, ValidationError, DictSanitizer, ListSanitizer > > >def sanitize(*args, **kwargs): > """ > Decorator that lets you sanitize the user input. > > The sanitize function can be used to validate the input > as well as change it. > > Note that changing a value here will actually alter the request > object. This should be no problem though. > > If the validation step fails an error will be passed to the user > instead of executing the function. This step should not raise > anything other than > :class:`~univention.management.console.modules.sanitizers.ValidationError` > or > :class:`~univention.management.console.modules.sanitizers.UnformattedValidationError` > (one should use the method > :meth:`~univention.management.console.modules.sanitizers.Sanitizer.raise_validation_error`). > > You can find some predefined Sanitize classes in the > corresponding module or you define one yourself, deriving it from > :class:`~univention.management.console.modules.sanitizers.Sanitizer`:: > > class SplitPathSanitizer(Sanitizer): > def __init__(self): > super(SplitPathSanitizer, self).__init__( > validate_none=True, > may_change_value=True) > > def _sanitize(self, value, name, further_fields): > if value is None: > return [] > try: > return value.split('/') > except: > self.raise_validation_error('Split failed') > > Before:: > > def my_func(self, request): > var1 = request.options.get('var1') > var2 = request.options.get('var2', 20) > try: > var1 = int(var1) > var2 = int(var2) > except (ValueError, TypeError): > self.finished(request.id, None, 'Cannot convert to int', > success=False, status=BAD_REQUEST_INVALID_OPTS) > return > if var2 < 10: > self.finished(request.id, None, 'var2 must be >= 10', > success=False, status=BAD_REQUEST_INVALID_OPTS) > return > self.finished(request.id, var1 + var2) > > After:: > > @sanitize(var1=IntegerSanitizer(required=True), > var2=IntegerSanitizer(required=True, minimum=10, default=20)) > def add(self, request): > var1 = request.options.get('var1') # could now use ['var1'] > var2 = request.options.get('var2') > self.finished(request.id, var1 + var2) > > The decorator can be combined with other decorators like > :func:`simple_response` (be careful with ordering of decorators here):: > > @sanitize(var1=IntegerSanitizer(required=True), > var2=IntegerSanitizer(required=True, minimum=10)) > @simple_response > def add(self, var1, var2): > return var1 + var2 > > Note that you lose the capability of specifiying defaults in > *@simple_response*. You need to do it in *@sanitize* now. > """ > if args: > return sanitize_list(args[0], **kwargs) > else: > return sanitize_dict(kwargs) > > >def sanitize_list(sanitizer, **kwargs): > return lambda function: _sanitize_list(function, sanitizer, kwargs) > > >def sanitize_dict(sanitized_attrs, **kwargs): > return lambda function: _sanitize_dict(function, sanitized_attrs, kwargs) > > >def _sanitize_dict(function, sanitized_attrs, sanitizer_parameters): > defaults = {'default': {}, 'required': True, 'may_change_value': True} > defaults.update(sanitizer_parameters) > return _sanitize(function, DictSanitizer(sanitized_attrs, **defaults)) > > >def _sanitize_list(function, sanitizer, sanitizer_parameters): > defaults = {'default': [], 'required': True, 'may_change_value': True} > defaults.update(sanitizer_parameters) > return _sanitize(function, ListSanitizer(sanitizer, **defaults)) > > >def _sanitize(function, sanitizer): > def _response(self, request): > try: > options_name = 'request.options' > try: > request.options = sanitizer.sanitize(options_name, {options_name: request.options}) > except MultiValidationError: > raise > except ValidationError as e: > multi_error = MultiValidationError() > multi_error.add_error(e, options_name) > raise multi_error > except MultiValidationError as e: > raise UMC_OptionSanitizeError(str(e), e.result()) > return function(self, request) > copy_function_meta_data(function, _response) > return _response > > >def simple_response(function=None, with_flavor=None, with_progress=False): > """If your function is as simple as: "Just return some variables" > this decorator is for you. > > Instead of defining the function > > .. code-block :: python > > def my_func(self, response): pass > > you now define a function with the variables you would expect in > *request.options*. Default values are supported: > > .. code-block :: python > > @simple_response > def my_func(self, var1, var2='default'): pass > > The decorator extracts variables from *request.options*. If the > variable is not found, it either returns a failure or sets it to a > default value (if specified by you). > > If you need to get the flavor passed to the function you can do it > like this:: > > @simple_response(with_flavor=True) > def my_func(self, flavor, var1, var2='default'): pass > > With *with_flavor* set, the flavor is extracted from the *request*. > You can also set with_flavor='varname', in which case the variable > name for the flavor is *varname*. *True* means 'flavor'. > As with ordinary option arguments, you may specify a default value > for flavor in the function definition:: > > @simple_response(with_flavor='module_flavor') > def my_func(self, flavor='this comes from request.options', > module_flavor='this is the flavor (and its default value)'): pass > > Instead of stating at the end of your function > > .. code-block:: python > > self.finished(request.id, some_value) > > you now just > > .. code-block:: python > > return some_value > > Before:: > > def my_func(self, request): > variable1 = request.options.get('variable1') > variable2 = request.options.get('variable2') > flavor = request.flavor or 'default flavor' > if variable1 is None: > self.finished(request.id, None, message='variable1 is required', success=False) > return > if variable2 is None: > variable2 = '' > try: > value = '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor) > except KeyError: > self.finished(request.id, None, message='Something went wrong', success=False, status=500) > return > self.finished(request.id, value) > > After:: > > @simple_response(with_flavor=True) > def my_func(self, variable1, variable2='', flavor='default_flavor'): > try: > return '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor) > except KeyError: > raise UMC_CommandError('Something went wrong') > > """ > if function is None: > return lambda f: simple_response(f, with_flavor, with_progress) > > _response = function > _response = arguments(_response) > > if with_flavor: > _response = flavored(_response, with_flavor) > > if with_progress: > _response = progressed(_response) > else: > _response = respond(_response) > > copy_function_meta_data(function, _response) > return _response > > >def flavored(function=None, name='flavor'): > # name of flavor argument. default: 'flavor' (if given, of course) > if function is None: > return lambda f: flavored(f, name=name) > > if name is True: > name = 'flavor' > > def _decorated(self, request, *args, **kwargs): > kwargs[name] = request.flavor > return function(self, request, *args, **kwargs) > return _decorated > > >def threaded(function): > def _thread_end(thread, response, self, request): > if isinstance(response, BaseException): > def _func(self, request): > raise thread.exc_info[0], thread.exc_info[1], thread.exc_info[2] > _func = error_handling(_func) > _func(self, request) > return > self.finished(request.id, response) > > def _decorated(self, request, *args, **kwargs): > thread = notifier.Callback(function, self, request, *args, **kwargs) > thread_end = notifier.Callback(_thread_end, self, request) > thread = notifier.threads.Simple('@threaded', thread, thread_end) > thread.run() > return _decorated > > >def progressed(function): > def _thread(self, function, progress_obj, request, args, kwargs): > try: > result = function(self, request, *args, **kwargs) > except: > progress_obj.exception(sys.exc_info()) > else: > progress_obj.finish_with_result(result) > > def _decorated(self, request, *args, **kwargs): > if not isinstance(request.options, dict): # TODO: remove check, we have sanitizers for this > raise UMC_OptionTypeError(_('Not a "dict"')) > progress_obj = self.new_progress() > request.options['progress'] = progress_obj > > thread = Thread(target=_thread, args=[self, function, progress_obj, request, args, kwargs]) > thread.start() > return progress_obj.initialised() > return respond(_decorated) > > >def progressed_iterator(function, progress): > def _decorated(self, request, *args, **kwargs): > number = len(request.options) > if progress is True: > progress_title = None > else: > if isinstance(progress, (list, tuple)): > progress_title, progress_msg = progress > else: > progress_title, progress_msg = progress, None > if '%d' in progress_title: > progress_title = progress_title % number > progress_obj = self.new_progress(progress_title, number) > > def _thread(self, request, progress_obj, args, kwargs): > try: > for res in function(self, request, *args, **kwargs): > if progress_msg: > res_msg = progress_msg % res > progress_obj.progress(res, res_msg) > except: > progress_obj.exception(sys.exc_info()) > else: > progress_obj.finish() > thread = Thread(target=_thread, args=[self, request, progress_obj, args, kwargs]) > thread.start() > return progress_obj.initialised() > return _decorated > > >def finisher(function, name=None): > name = name or 'finisher' > > def _decorated(self, request, *args, **kwargs): > kwargs[name] = partial(self.finished, request.id) > return function(self, request, *args, **kwargs) > return _decorated > > >def respond(function): # return_evaluator > def _decorated(self, request, *args, **kwargs): > result = function(self, request, *args, **kwargs) > if isinstance(result, types.FunctionType): > result = threaded(result) > return result(self, request, *args, **kwargs) > self.finished(request.id, result) > return _decorated > > >def respond_iterator(function): # generator_response > def _decorated(self, request, *args, **kwargs): > return list(function(self, request, *args, **kwargs)) > return respond(_decorated) > > >def arguments(function): # argument_optimizer_dict > # function must be the real function! > def _decorated(self, request, *args, **kwargs): > arguments, keywords = _get_argspec(function, request, args, kwargs) > return function(self, *arguments, **keywords) > return _decorated > > >def arguments_iterator(function): # argument_optimizer_list > # function must be the real function! > def _decorated(self, request, *args, **kwargs): > if not isinstance(request.options, (list, tuple)): > raise UMC_OptionTypeError(_('Not a "list"')) > options = request.options > arguments = [] > iterator = arginspect(function).args[1] > for request.options in options: > request.options[iterator] = None > single_arguments, keywords = _get_argspec(function, request, args, kwargs) > del request.options[iterator] > arguments.append(single_arguments[1:]) > request.options = options > MODULE.error('### iter=%r, args=%r' % (iterator, arguments)) > nones = [None] * (len(arguments[0])) # FIXME: request with empty list > return function(self, iter(arguments), *nones, **keywords) > return _decorated > > >def arguments_iterator_single_values(function): # argument_optimizer_list_single_values > # function must be the real function! > def _decorated(self, request, *args, **kwargs): > if not isinstance(request.options, (list, tuple)): > raise UMC_OptionTypeError(_('Not a "list"')) > nones = [None] * len(request.options) > return function(self, iter(request.options), *nones, **kwargs) > return _decorated > > >def _get_argspec(function, request, args, kwargs): > if not isinstance(request.options, dict): > raise UMC_OptionTypeError(_('Not a "dict"')) > > argspec = arginspect(function) > argument_names = argspec.args > defaults = argspec.defaults > > # use defaults as dict > if defaults: > defaults = dict(zip(argument_names[-len(defaults):], defaults)) > else: > defaults = {} > > arguments = [] > keywords = {} > > for arg in argument_names[1:]: # remove self > try: > arguments.append(kwargs.pop(arg)) > continue > except KeyError: > pass > try: > arguments.append(request.options[arg]) > continue > except KeyError: > pass > try: > arguments.append(defaults[arg]) > continue > except KeyError: > pass > raise UMC_OptionMissing(arg) > > if argspec.varargs: > arguments += list(args) > if argspec.keywords: > keywords.update(dict((k, v) for k, v in request.options.iteritems() if k not in argument_names)) > keywords.update(kwargs) > return arguments, keywords > > >def multi_response(function=None, with_flavor=None, single_values=False, progress=False): > """This decorator acts similar to :func:`simple_response` but > can handle a list of dicts instead of a single dict. > > Technically another object is passed to the function that you can > name as you like. You can iterate over this object and get the values > from each dictionary in *request.options*. > > Default values and flavors are supported. > > You do not return a value, you yield them (and you are supposed to > yield!):: > > @multi_response > def my_multi_func(self, iterator, variable1, variable2=''): > # here, variable1 and variable2 are yet to be initialised > # i.e. variable1 and variable2 will be None! > do_some_initial_stuff() > try: > for variable1, variable2 in iterator: > # now they are set > yield '%s_%s' % (self._saved_dict[variable1], variable2) > except KeyError: > raise UMC_CommandError('Something went wrong') > else: > # only when everything went right... > do_some_cleanup_stuff() > > The above code will send a list of answers to the client as soon as > the function is finished (i.e. after *do_some_cleanup_stuff()*) > filled with values yielded. > > If you have just one variable in your dictionary, do not forget to > add a comma, otherwise Python will assign the first value a list > of one element:: > > for var, in iterator: > # now var is set correctly > pass > """ > if function is None: > return lambda f: multi_response(f, with_flavor, single_values, progress) > > _response = function > > if single_values: > _response = arguments_iterator_single_values(_response) > else: > _response = arguments_iterator(_response) > > if with_flavor: > _response = flavored(_response, with_flavor) > > if progress: > _response = progressed_iterator(_response, progress) > _response = respond(_response) > else: > _response = respond_iterator(_response) > > copy_function_meta_data(function, _response) > return _response > > >def arginspect(function): > argspec = inspect.getargspec(function) > spec = {} > for key in ('args', 'varargs', 'keywords', 'defaults'): > spec[key] = getattr(function, '_original_%s' % (key,), getattr(argspec, key)) > return type('argspec', (object,), spec)() > > >def copy_function_meta_data(original_function, new_function, copy_arg_inspect=False): > # set function attrs to allow another arginspect to get original info > # (used in @simple_response / @log - combo) > if copy_arg_inspect: > argspec = arginspect(original_function) > for key in ('args', 'varargs', 'keywords', 'defaults'): > setattr(new_function, '_original_%s' % (key,), getattr(argspec, key)) > # copy __doc__, otherwise it would not show up in api and such > new_function.__doc__ = original_function.__doc__ > # copy __name__, otherwise it would be something like "_response" > new_function.__name__ = original_function.__name__ > # copy __module__, otherwise it would be "univention.management.console.modules.decorators" > new_function.__module__ = original_function.__module__ > > >def log(function=None, sensitives=None, customs=None, single_values=False): > '''Log decorator to be used with > :func:`simple_response`:: > > @simple_response > @log > def my_func(self, var1, var2): > return "%s__%s" % (var1, var2) > > The above example will write two lines into the logfile for the > module (given that the the UCR variable *umc/module/debug/level* > is set to at least 3):: > > <date> MODULE ( INFO ) : my_func got: var1='value1', var2='value2' > <date> MODULE ( INFO ) : my_func returned: 'value1__value2' > > The variable names are ordered by appearance and hold the values that > are actually going to be passed to the function (i.e. after they were > :func:`sanitize` 'd or set to their default value). > You may specify the names of sensitive arguments that should not > show up in log files and custom functions that can alter the > representation of a certain variable's values (useful for non-standard > datatypes like regular expressions - you may have used a > :class:`~univention.management.console.modules.sanitizers.PatternSanitizer` > ):: > > @sanitize(pattern=PatternSanitizer()) > @simple_reponse > @log(sensitives=['password'], customs={'pattern':lambda x: x.pattern}) > def count_ucr(self, username, password, pattern): > return self._ucr_count(username, password, pattern) > > This results in something like:: > > <date> MODULE ( INFO ) : count_ucr got: password='********', username='Administrator', pattern='.*' > <date> MODULE ( INFO ) : count_ucr returned: 650 > > The decorator also works with :func:`multi_response`:: > > @multi_response > @log > def multi_my_func(self, iterator, var1, var2): > for var1, var2 in iterator: > yield "%s__%s" % (var1, var2) > > This results in something like:: > > <date> MODULE ( INFO ) : multi_my_func got: [var1='value1', var2='value2'], [var1='value3', var2='value4'] > <date> MODULE ( INFO ) : multi_my_func returned: ['value1__value2', 'value3__value4'] > ''' > if function is None: > return lambda f: log(f, sensitives, customs, single_values) > if customs is None: > customs = {} > if sensitives is None: > sensitives = [] > for sensitive in sensitives: > customs[sensitive] = lambda x: '********' > > def _log(names, args): > if single_values: > args = [args] > return ['%s=%r' % (name, customs.get(name, lambda x: x)(arg)) for name, arg in zip(names, args)] > > # including self > names = arginspect(function).args > name = function.__name__ > # multi_response yields i.e. is generator function > if inspect.isgeneratorfunction(function): > # remove self, iterator > names = names[2:] > > def _response(self, iterator, *args): > arg_reprs = [] > for element in iterator: > arg_repr = _log(names, element) > if arg_repr: > arg_reprs.append(arg_repr) > if arg_reprs: > MODULE.info('%s got: [%s]' % (name, '], ['.join(', '.join(arg_repr) for arg_repr in arg_reprs))) > result = [] > for res in function(self, iterator, *args): > result.append(res) > yield res > MODULE.info('%s returned: %r' % (name, result)) > else: > # remove self > names = names[1:] > > def _response(self, *args): > arg_repr = _log(names, args) > if arg_repr: > MODULE.info('%s got: %s' % (name, ', '.join(arg_repr))) > result = function(self, *args) > MODULE.info('%s returned: %r' % (name, result)) > return result > copy_function_meta_data(function, _response, copy_arg_inspect=True) > return _response > > >def file_upload(function): > ''' This decorator restricts requests to be > UPLOAD-commands. Simple, yet effective ''' > > def _response(self, request): > if request.command != 'UPLOAD': > raise UMC_CommandError(_('%s can only be used as UPLOAD') % (function.__name__)) > return function(self, request) > copy_function_meta_data(function, _response) > return _response > > >class reloading_ucr(object): > > _last_reload = dict() > > def __init__(self, ucr, timeout=0.2): > self._ucr = ucr > self._timeout = timeout > > def __call__(self, func): > @functools.wraps(func) > def wrapper(*args, **kwargs): > last_reload = self._last_reload.get(id(self._ucr), 0) > if last_reload == 0 or time.time() - last_reload > self._timeout: > self._ucr.load() > self._last_reload[id(self._ucr)] = time.time() > return func(*args, **kwargs) > return wrapper > > >def require_password(function): > @functools.wraps(function) > def _decorated(self, request, *args, **kwargs): > self.require_password() > return function(self, request, *args, **kwargs) > return _decorated > > >__all__ = ['simple_response', 'multi_response', 'sanitize', 'log', 'sanitize_list', 'sanitize_dict', 'file_upload', 'reloading_ucr', 'require_password']
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
Actions:
View
|
Diff
Attachments on
bug 36131
:
6158
| 7894