Source code for pyams_utils.request

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

"""PyAMS_utils.request module


"""

import logging

from pyramid.interfaces import IAuthenticationPolicy, IAuthorizationPolicy, IRequestFactory
from pyramid.request import Request
from pyramid.security import Allowed
from pyramid.threadlocal import get_current_registry, get_current_request
from zope.annotation.interfaces import IAnnotations, IAttributeAnnotatable
from zope.interface import alsoProvides

from pyams_utils.interfaces import ICacheKeyValue, MissingRequestError, DISPLAY_CONTEXT_KEY_NAME
from pyams_utils.registry import get_global_registry

__docformat__ = 'restructuredtext'


LOGGER = logging.getLogger('PyAMS (utils)')

_MARKER = object()


[docs]class RequestSelector: """Interface based request selector This selector can be used as a subscriber predicate to define an interface that the event's 'request' attribute must support for the event to be applied:: .. code-block:: python from pyams_utils.interfaces.site import ISiteRoot @subscriber(IBeforeTraverseEvent, request_selector=IPyAMSLayer) def before_traverse_event(event): '''This is an event handler for an IPyAMSRequest modification event''' """ def __init__(self, ifaces, config): # pylint: disable=unused-argument if not isinstance(ifaces, (list, tuple, set)): ifaces = (ifaces,) self.interfaces = ifaces
[docs] def text(self): """Predicate label""" return 'request_selector = %s' % str(self.interfaces)
phash = text def __call__(self, event): for intf in self.interfaces: try: if intf.providedBy(event.request): return True except (AttributeError, TypeError): if isinstance(event.request, intf): return True return False
[docs]def request_property(key=None, prefix=None): """Define a method decorator used to store result into current request's annotations If no request is currently running, a new one is created. `key` is a required argument; if None, the key will be the method's object :param str key: annotations value key; if *None*, the key will be the method's object; if *key* is a callable object, it will be called to get the actual session key :param prefix: str; prefix to use for session key; if *None*, the prefix will be the property name """ def request_decorator(func): def wrapper(obj, key, *args, **kwargs): request = query_request() if request is not None: if callable(key): key = key(obj, *args, **kwargs) if not key: key = prefix or func.__name__ if obj is not request: key += '::{0}'.format(ICacheKeyValue(obj)) key_args = tuple(filter(lambda x: x is not request, args)) if key_args: key += '::' + '::'.join((ICacheKeyValue(arg) for arg in key_args)) if kwargs: key += '::' + \ '::'.join(('{0}={1}'.format(key, ICacheKeyValue(val)) for key, val in kwargs.items())) LOGGER.debug(">>> Looking for request cache key {0}".format(key)) data = get_request_data(request, key, _MARKER) if data is _MARKER: LOGGER.debug("<<< no cached value!") data = func if callable(data): data = data(obj, *args, **kwargs) set_request_data(request, key, data) else: LOGGER.debug("<<< cached value found!") LOGGER.debug(" < {0!r}".format(data)) else: data = func if callable(data): data = data(obj, *args, **kwargs) return data return lambda x, *args, **kwargs: wrapper(x, key, *args, **kwargs) return request_decorator
[docs]class PyAMSRequest(Request): """Custom request factory Used to add 'context' argument to 'effective_principals' method call to be able to get 'roles' principals """
[docs] @request_property(key=None) def has_permission(self, permission, context=None): if context is None: context = self.context # pylint: disable=no-member try: reg = self.registry except AttributeError: reg = get_current_registry() authn_policy = reg.queryUtility(IAuthenticationPolicy) if authn_policy is None: return Allowed('No authentication policy in use.') authz_policy = reg.queryUtility(IAuthorizationPolicy) if authz_policy is None: raise ValueError('Authentication policy registered without ' 'authorization policy') # should never happen principals = authn_policy.effective_principals(self, context) return authz_policy.permits(context, principals, permission)
[docs]def get_request(raise_exception=True): """Get current request Raises a NoInteraction exception if there is no active request. """ request = get_current_request() if (request is None) and raise_exception: raise MissingRequestError("No request") return request
[docs]def query_request(): """Query current request Returns None if there is no active request""" try: return get_request() except MissingRequestError: return None
# pylint: disable=invalid-name,too-many-arguments,attribute-defined-outside-init
[docs]def check_request(path='/', environ=None, base_url=None, headers=None, POST=None, registry=None, principal_id=None, **kwargs): """Get current request, or create a new blank one if missing""" try: return get_request() except MissingRequestError: if registry is None: registry = get_current_registry() if registry is None: registry = get_global_registry() factory = registry.queryUtility(IRequestFactory) if factory is None: factory = PyAMSRequest request = factory.blank(path, environ, base_url, headers, POST, **kwargs) request.registry = registry # pylint: disable=attribute-defined-outside-init if principal_id is not None: try: # pylint: disable=import-outside-toplevel from pyams_security.utility import get_principal except ImportError: pass else: request.principal = get_principal(request, principal_id) return request
[docs]def copy_request(request): """Create clone of given request, keeping registry and root as well""" root = getattr(request, 'root', None) request = request.copy() if not hasattr(request, 'registry'): registry = get_current_registry() if registry is None: registry = get_global_registry() request.registry = registry request.root = root return request
[docs]def get_annotations(request): """Define 'annotations' request property This function is automatically defined as a custom request method on package include. """ alsoProvides(request, IAttributeAnnotatable) return IAnnotations(request)
[docs]def get_debug(request): # pylint: disable=unused-argument """Define 'debug' request property This function is automatically defined as a custom request method on package include. """ class Debug(): """Request debug class""" def __init__(self): self.showTAL = False self.sourceAnnotations = False return Debug()
[docs]def get_request_data(request, key, default=None): """Get data associated with request :param request: the request containing requested data :param str key: request data annotation key :param object default: the default value when data is missing :return: the requested value, or *default* """ try: annotations = request.annotations except (TypeError, AttributeError): annotations = get_annotations(request) return annotations.get(key, default)
[docs]def set_request_data(request, key, value): """Associate data with request :param request: the request in which to set data :param str key: request data annotation key :param object value: the value to be set in request annotation """ try: annotations = request.annotations except (TypeError, AttributeError): annotations = get_annotations(request) annotations[key] = value
[docs]def get_display_context(request): """Get current display context The display context can be used when we generate a page to display an object in the context of another one; PyAMS_content package is using this feature to display "shared" contents as is they were located inside another site or folder... """ return request.annotations.get(DISPLAY_CONTEXT_KEY_NAME, request.context)