#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
#
# Univention Management Console
# Decorators for functions in UMC 2.0 modules
#
# Copyright 2012-2016 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# .
"""
Convenience decorators for developers of UMC modules
====================================================
Functions exposed by UMC modules often share some logic. They check the
existance and formatting of variables or check permissions. If anything
fails, they react in a similar way. If everything is correct, the real
logic is often as simple as returning one single value.
This module provides functions that can be used to separate repeating
tasks from the actual business logic. This means:
* less time to code
* fewer bugs
* consistent behaviour throughout the UMC in standard cases
Note that the functions defined herein do not cover every corner case during
UMC module development. You are not bound to use them if you need more
flexibility.
"""
import sys
import inspect
import types
import traceback
import notifier
import notifier.threads
from threading import Thread
from functools import partial
from univention.lib.i18n import Translation
_ = Translation('univention.management.console').translate
from univention.management.console.modules import UMC_OptionTypeError, UMC_OptionMissing, UMC_CommandError, UMC_OptionSanitizeError, error_handling
from univention.management.console.log import MODULE
from univention.management.console.modules.sanitizers import MultiValidationError, ValidationError, DictSanitizer, ListSanitizer
def sanitize(*args, **kwargs):
"""
Decorator that lets you sanitize the user input.
The sanitize function can be used to validate the input
as well as change it.
Note that changing a value here will actually alter the request
object. This should be no problem though.
If the validation step fails an error will be passed to the user
instead of executing the function. This step should not raise
anything other than
:class:`~univention.management.console.modules.sanitizers.ValidationError`
or
:class:`~univention.management.console.modules.sanitizers.UnformattedValidationError`
(one should use the method
:meth:`~univention.management.console.modules.sanitizers.Sanitizer.raise_validation_error`).
You can find some predefined Sanitize classes in the
corresponding module or you define one yourself, deriving it from
:class:`~univention.management.console.modules.sanitizers.Sanitizer`::
class SplitPathSanitizer(Sanitizer):
def __init__(self):
super(SplitPathSanitizer, self).__init__(
validate_none=True,
may_change_value=True)
def _sanitize(self, value, name, further_fields):
if value is None:
return []
try:
return value.split('/')
except:
self.raise_validation_error('Split failed')
Before::
def my_func(self, request):
var1 = request.options.get('var1')
var2 = request.options.get('var2', 20)
try:
var1 = int(var1)
var2 = int(var2)
except (ValueError, TypeError):
self.finished(request.id, None, 'Cannot convert to int',
success=False, status=BAD_REQUEST_INVALID_OPTS)
return
if var2 < 10:
self.finished(request.id, None, 'var2 must be >= 10',
success=False, status=BAD_REQUEST_INVALID_OPTS)
return
self.finished(request.id, var1 + var2)
After::
@sanitize(var1=IntegerSanitizer(required=True),
var2=IntegerSanitizer(required=True, minimum=10, default=20))
def add(self, request):
var1 = request.options.get('var1') # could now use ['var1']
var2 = request.options.get('var2')
self.finished(request.id, var1 + var2)
The decorator can be combined with other decorators like
:func:`simple_response` (be careful with ordering of decorators here)::
@sanitize(var1=IntegerSanitizer(required=True),
var2=IntegerSanitizer(required=True, minimum=10))
@simple_response
def add(self, var1, var2):
return var1 + var2
Note that you lose the capability of specifiying defaults in
*@simple_response*. You need to do it in *@sanitize* now.
"""
if args:
return sanitize_list(args[0], **kwargs)
else:
return sanitize_dict(kwargs)
def sanitize_list(sanitizer, **kwargs):
return lambda function: _sanitize_list(function, sanitizer, kwargs)
def sanitize_dict(sanitized_attrs, **kwargs):
return lambda function: _sanitize_dict(function, sanitized_attrs, kwargs)
def _sanitize_dict(function, sanitized_attrs, sanitizer_parameters):
defaults = {'default': {}, 'required': True, 'may_change_value': True}
defaults.update(sanitizer_parameters)
return _sanitize(function, DictSanitizer(sanitized_attrs, **defaults))
def _sanitize_list(function, sanitizer, sanitizer_parameters):
defaults = {'default': [], 'required': True, 'may_change_value': True}
defaults.update(sanitizer_parameters)
return _sanitize(function, ListSanitizer(sanitizer, **defaults))
def _sanitize(function, sanitizer):
def _response(self, request):
try:
options_name = 'request.options'
try:
request.options = sanitizer.sanitize(options_name, {options_name: request.options})
except MultiValidationError:
raise
except ValidationError as e:
multi_error = MultiValidationError()
multi_error.add_error(e, options_name)
raise multi_error
except MultiValidationError as e:
raise UMC_OptionSanitizeError(str(e), e.result())
return function(self, request)
copy_function_meta_data(function, _response)
return _response
def simple_response(function=None, with_flavor=None, with_progress=False):
"""If your function is as simple as: "Just return some variables"
this decorator is for you.
Instead of defining the function
.. code-block :: python
def my_func(self, response): pass
you now define a function with the variables you would expect in
*request.options*. Default values are supported:
.. code-block :: python
@simple_response
def my_func(self, var1, var2='default'): pass
The decorator extracts variables from *request.options*. If the
variable is not found, it either returns a failure or sets it to a
default value (if specified by you).
If you need to get the flavor passed to the function you can do it
like this::
@simple_response(with_flavor=True)
def my_func(self, flavor, var1, var2='default'): pass
With *with_flavor* set, the flavor is extracted from the *request*.
You can also set with_flavor='varname', in which case the variable
name for the flavor is *varname*. *True* means 'flavor'.
As with ordinary option arguments, you may specify a default value
for flavor in the function definition::
@simple_response(with_flavor='module_flavor')
def my_func(self, flavor='this comes from request.options',
module_flavor='this is the flavor (and its default value)'): pass
Instead of stating at the end of your function
.. code-block:: python
self.finished(request.id, some_value)
you now just
.. code-block:: python
return some_value
Before::
def my_func(self, request):
variable1 = request.options.get('variable1')
variable2 = request.options.get('variable2')
flavor = request.flavor or 'default flavor'
if variable1 is None:
self.finished(request.id, None, message='variable1 is required', success=False)
return
if variable2 is None:
variable2 = ''
try:
value = '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor)
except KeyError:
self.finished(request.id, None, message='Something went wrong', success=False, status=500)
return
self.finished(request.id, value)
After::
@simple_response(with_flavor=True)
def my_func(self, variable1, variable2='', flavor='default_flavor'):
try:
return '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor)
except KeyError:
raise UMC_CommandError('Something went wrong')
"""
if function is None:
return lambda f: simple_response(f, with_flavor, with_progress)
_response = function
_response = arguments(_response)
if with_flavor:
_response = flavored(_response, with_flavor)
if with_progress:
_response = progressed(_response)
else:
_response = respond(_response)
copy_function_meta_data(function, _response)
return _response
def flavored(function=None, name='flavor'):
# name of flavor argument. default: 'flavor' (if given, of course)
if function is None:
return lambda f: flavored(f, name=name)
if name is True:
name = 'flavor'
def _decorated(self, request, *args, **kwargs):
kwargs[name] = request.flavor
return function(self, request, *args, **kwargs)
return _decorated
def threaded(function):
def _thread_end(thread, response, self, request):
if isinstance(response, BaseException):
def _func(self, request):
raise thread.exc_info[0], thread.exc_info[1], thread.exc_info[2]
_func = error_handling(_func)
_func(self, request)
return
self.finished(request.id, response)
def _decorated(self, request, *args, **kwargs):
thread = notifier.Callback(function, self, request, *args, **kwargs)
thread_end = notifier.Callback(_thread_end, self, request)
thread = notifier.threads.Simple('@threaded', thread, thread_end)
thread.run()
return _decorated
def progressed(function):
def _thread(self, function, progress_obj, request, args, kwargs):
try:
result = function(self, request, *args, **kwargs)
except:
progress_obj.exception(sys.exc_info())
else:
progress_obj.finish_with_result(result)
def _decorated(self, request, *args, **kwargs):
if not isinstance(request.options, dict): # TODO: remove check, we have sanitizers for this
raise UMC_OptionTypeError(_('Not a "dict"'))
progress_obj = self.new_progress()
request.options['progress'] = progress_obj
thread = Thread(target=_thread, args=[self, function, progress_obj, request, args, kwargs])
thread.start()
return progress_obj.initialised()
return respond(_decorated)
def progressed_iterator(function, progress):
def _decorated(self, request, *args, **kwargs):
number = len(request.options)
if progress is True:
progress_title = None
else:
if isinstance(progress, (list, tuple)):
progress_title, progress_msg = progress
else:
progress_title, progress_msg = progress, None
if '%d' in progress_title:
progress_title = progress_title % number
progress_obj = self.new_progress(progress_title, number)
def _thread(self, request, progress_obj, args, kwargs):
try:
for res in function(self, request, *args, **kwargs):
if progress_msg:
res_msg = progress_msg % res
progress_obj.progress(res, res_msg)
except:
progress_obj.exception(sys.exc_info())
else:
progress_obj.finish()
thread = Thread(target=_thread, args=[self, request, progress_obj, args, kwargs])
thread.start()
return progress_obj.initialised()
return _decorated
def finisher(function, name=None):
name = name or 'finisher'
def _decorated(self, request, *args, **kwargs):
kwargs[name] = partial(self.finished, request.id)
return function(self, request, *args, **kwargs)
return _decorated
def respond(function): # return_evaluator
def _decorated(self, request, *args, **kwargs):
result = function(self, request, *args, **kwargs)
if isinstance(result, types.FunctionType):
result = threaded(result)
return result(self, request, *args, **kwargs)
self.finished(request.id, result)
return _decorated
def respond_iterator(function): # generator_response
def _decorated(self, request, *args, **kwargs):
return list(function(self, request, *args, **kwargs))
return respond(_decorated)
def arguments(function): # argument_optimizer_dict
# function must be the real function!
def _decorated(self, request, *args, **kwargs):
arguments, keywords = _get_argspec(function, request, args, kwargs)
return function(self, *arguments, **keywords)
return _decorated
def arguments_iterator(function): # argument_optimizer_list
# function must be the real function!
def _decorated(self, request, *args, **kwargs):
if not isinstance(request.options, (list, tuple)):
raise UMC_OptionTypeError(_('Not a "list"'))
options = request.options
arguments = []
iterator = arginspect(function).args[1]
for request.options in options:
request.options[iterator] = None
single_arguments, keywords = _get_argspec(function, request, args, kwargs)
del request.options[iterator]
arguments.append(single_arguments[1:])
request.options = options
MODULE.error('### iter=%r, args=%r' % (iterator, arguments))
nones = [None] * (len(arguments[0])) # FIXME: request with empty list
return function(self, iter(arguments), *nones, **keywords)
return _decorated
def arguments_iterator_single_values(function): # argument_optimizer_list_single_values
# function must be the real function!
def _decorated(self, request, *args, **kwargs):
if not isinstance(request.options, (list, tuple)):
raise UMC_OptionTypeError(_('Not a "list"'))
nones = [None] * len(request.options)
return function(self, iter(request.options), *nones, **kwargs)
return _decorated
def _get_argspec(function, request, args, kwargs):
if not isinstance(request.options, dict):
raise UMC_OptionTypeError(_('Not a "dict"'))
argspec = arginspect(function)
argument_names = argspec.args
defaults = argspec.defaults
# use defaults as dict
if defaults:
defaults = dict(zip(argument_names[-len(defaults):], defaults))
else:
defaults = {}
arguments = []
keywords = {}
for arg in argument_names[1:]: # remove self
try:
arguments.append(kwargs.pop(arg))
continue
except KeyError:
pass
try:
arguments.append(request.options[arg])
continue
except KeyError:
pass
try:
arguments.append(defaults[arg])
continue
except KeyError:
pass
raise UMC_OptionMissing(arg)
if argspec.varargs:
arguments += list(args)
if argspec.keywords:
keywords.update(dict((k, v) for k, v in request.options.iteritems() if k not in argument_names))
keywords.update(kwargs)
return arguments, keywords
def multi_response(function=None, with_flavor=None, single_values=False, progress=False):
"""This decorator acts similar to :func:`simple_response` but
can handle a list of dicts instead of a single dict.
Technically another object is passed to the function that you can
name as you like. You can iterate over this object and get the values
from each dictionary in *request.options*.
Default values and flavors are supported.
You do not return a value, you yield them (and you are supposed to
yield!)::
@multi_response
def my_multi_func(self, iterator, variable1, variable2=''):
# here, variable1 and variable2 are yet to be initialised
# i.e. variable1 and variable2 will be None!
do_some_initial_stuff()
try:
for variable1, variable2 in iterator:
# now they are set
yield '%s_%s' % (self._saved_dict[variable1], variable2)
except KeyError:
raise UMC_CommandError('Something went wrong')
else:
# only when everything went right...
do_some_cleanup_stuff()
The above code will send a list of answers to the client as soon as
the function is finished (i.e. after *do_some_cleanup_stuff()*)
filled with values yielded.
If you have just one variable in your dictionary, do not forget to
add a comma, otherwise Python will assign the first value a list
of one element::
for var, in iterator:
# now var is set correctly
pass
"""
if function is None:
return lambda f: multi_response(f, with_flavor, single_values, progress)
_response = function
if single_values:
_response = arguments_iterator_single_values(_response)
else:
_response = arguments_iterator(_response)
if with_flavor:
_response = flavored(_response, with_flavor)
if progress:
_response = progressed_iterator(_response, progress)
_response = respond(_response)
else:
_response = respond_iterator(_response)
copy_function_meta_data(function, _response)
return _response
def arginspect(function):
argspec = inspect.getargspec(function)
spec = {}
for key in ('args', 'varargs', 'keywords', 'defaults'):
spec[key] = getattr(function, '_original_%s' % (key,), getattr(argspec, key))
return type('argspec', (object,), spec)()
def copy_function_meta_data(original_function, new_function, copy_arg_inspect=False):
# set function attrs to allow another arginspect to get original info
# (used in @simple_response / @log - combo)
if copy_arg_inspect:
argspec = arginspect(original_function)
for key in ('args', 'varargs', 'keywords', 'defaults'):
setattr(new_function, '_original_%s' % (key,), getattr(argspec, key))
# copy __doc__, otherwise it would not show up in api and such
new_function.__doc__ = original_function.__doc__
# copy __name__, otherwise it would be something like "_response"
new_function.__name__ = original_function.__name__
# copy __module__, otherwise it would be "univention.management.console.modules.decorators"
new_function.__module__ = original_function.__module__
def log(function=None, sensitives=None, customs=None, single_values=False):
'''Log decorator to be used with
:func:`simple_response`::
@simple_response
@log
def my_func(self, var1, var2):
return "%s__%s" % (var1, var2)
The above example will write two lines into the logfile for the
module (given that the the UCR variable *umc/module/debug/level*
is set to at least 3)::
MODULE ( INFO ) : my_func got: var1='value1', var2='value2'
MODULE ( INFO ) : my_func returned: 'value1__value2'
The variable names are ordered by appearance and hold the values that
are actually going to be passed to the function (i.e. after they were
:func:`sanitize` 'd or set to their default value).
You may specify the names of sensitive arguments that should not
show up in log files and custom functions that can alter the
representation of a certain variable's values (useful for non-standard
datatypes like regular expressions - you may have used a
:class:`~univention.management.console.modules.sanitizers.PatternSanitizer`
)::
@sanitize(pattern=PatternSanitizer())
@simple_reponse
@log(sensitives=['password'], customs={'pattern':lambda x: x.pattern})
def count_ucr(self, username, password, pattern):
return self._ucr_count(username, password, pattern)
This results in something like::
MODULE ( INFO ) : count_ucr got: password='********', username='Administrator', pattern='.*'
MODULE ( INFO ) : count_ucr returned: 650
The decorator also works with :func:`multi_response`::
@multi_response
@log
def multi_my_func(self, iterator, var1, var2):
for var1, var2 in iterator:
yield "%s__%s" % (var1, var2)
This results in something like::
MODULE ( INFO ) : multi_my_func got: [var1='value1', var2='value2'], [var1='value3', var2='value4']
MODULE ( INFO ) : multi_my_func returned: ['value1__value2', 'value3__value4']
'''
if function is None:
return lambda f: log(f, sensitives, customs, single_values)
if customs is None:
customs = {}
if sensitives is None:
sensitives = []
for sensitive in sensitives:
customs[sensitive] = lambda x: '********'
def _log(names, args):
if single_values:
args = [args]
return ['%s=%r' % (name, customs.get(name, lambda x: x)(arg)) for name, arg in zip(names, args)]
# including self
names = arginspect(function).args
name = function.__name__
# multi_response yields i.e. is generator function
if inspect.isgeneratorfunction(function):
# remove self, iterator
names = names[2:]
def _response(self, iterator, *args):
arg_reprs = []
for element in iterator:
arg_repr = _log(names, element)
if arg_repr:
arg_reprs.append(arg_repr)
if arg_reprs:
MODULE.info('%s got: [%s]' % (name, '], ['.join(', '.join(arg_repr) for arg_repr in arg_reprs)))
result = []
for res in function(self, iterator, *args):
result.append(res)
yield res
MODULE.info('%s returned: %r' % (name, result))
else:
# remove self
names = names[1:]
def _response(self, *args):
arg_repr = _log(names, args)
if arg_repr:
MODULE.info('%s got: %s' % (name, ', '.join(arg_repr)))
result = function(self, *args)
MODULE.info('%s returned: %r' % (name, result))
return result
copy_function_meta_data(function, _response, copy_arg_inspect=True)
return _response
def file_upload(function):
''' This decorator restricts requests to be
UPLOAD-commands. Simple, yet effective '''
def _response(self, request):
if request.command != 'UPLOAD':
raise UMC_CommandError(_('%s can only be used as UPLOAD') % (function.__name__))
return function(self, request)
copy_function_meta_data(function, _response)
return _response
class reloading_ucr(object):
_last_reload = dict()
def __init__(self, ucr, timeout=0.2):
self._ucr = ucr
self._timeout = timeout
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_reload = self._last_reload.get(id(self._ucr), 0)
if last_reload == 0 or time.time() - last_reload > self._timeout:
self._ucr.load()
self._last_reload[id(self._ucr)] = time.time()
return func(*args, **kwargs)
return wrapper
def require_password(function):
@functools.wraps(function)
def _decorated(self, request, *args, **kwargs):
self.require_password()
return function(self, request, *args, **kwargs)
return _decorated
__all__ = ['simple_response', 'multi_response', 'sanitize', 'log', 'sanitize_list', 'sanitize_dict', 'file_upload', 'reloading_ucr', 'require_password']