#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
"""PyAMS_utils.registry module
This package is used to manage a *local registry*. A local registry is a *site management* component
created automatically on application startup by PyAMS_utils package. It can be used to store and
register components, mainly persistent utilities which are created and configured dynamically by a
site administrator; this can include SQLAlchemy engines, ZEO connections, and several PyAMS
utilities like security manager, medias converter, tasks scheduler and many other ones.
See :ref:`zca` to get a brief introduction about using a local registry with PyAMS packages.
"""
import logging
import threading
import venusian
from ZODB.POSException import POSError
from pyramid.events import subscriber
from pyramid.interfaces import INewRequest
from pyramid.threadlocal import get_current_registry as get_request_registry, manager
from zope.component.globalregistry import getGlobalSiteManager
from zope.component.interfaces import ISite
from zope.interface import implementedBy, providedBy
from zope.interface.interfaces import ComponentLookupError
from zope.traversing.interfaces import IBeforeTraverseEvent
__docformat__ = 'restructuredtext'
LOGGER = logging.getLogger('PyAMS (utils)')
[docs]class LocalRegistry(threading.local):
"""Local registry
The local registry is defined to allow access to persistent utility
registered and stored into ZODB.
"""
_registry = None
[docs] def get_registry(self):
"""Return local registry"""
return self._registry
[docs] def set_registry(self, registry):
"""Define local registry"""
self._registry = registry
local_registry = LocalRegistry() # pylint: disable=invalid-name
[docs]def get_local_registry():
"""Get local registry
Local registry is automatically defined while traversing a site manager.
"""
return local_registry.get_registry()
[docs]def set_local_registry(registry):
"""Define local registry"""
local_registry.set_registry(registry)
[docs]@subscriber(INewRequest)
def handle_new_request(event): # pylint: disable=unused-argument
"""New request event subscriber
Is used to initialize local registry to None for any new request
"""
set_local_registry(None)
[docs]@subscriber(IBeforeTraverseEvent, context_selector=ISite)
def handle_site_before_traverse(event):
"""Before traverse event subscriber
Define site's local registry when an object implementing ISite is traversed
"""
set_local_registry(event.object.getSiteManager())
[docs]def get_registries():
"""Iterator on components registries
Returns an iterator on current local registry (if any) and registries associated
in current thread stack.
"""
seen = []
append = seen.append
registry = local_registry.get_registry()
if registry is not None:
yield registry
append(registry)
for entry in reversed(manager.stack):
stack_registry = entry.get('registry')
if (stack_registry is not None) and (stack_registry not in seen):
yield stack_registry
append(stack_registry)
[docs]def get_global_registry():
"""Get global registry"""
return getGlobalSiteManager()
[docs]def get_current_registry(context=None):
"""Get current or global registry
The function is looking for given request registry.
If registry is None, returns the global registry.
"""
registry = get_request_registry(context)
if registry is None:
registry = get_global_registry()
return registry
[docs]def registered_utilities():
"""Get utilities registrations as generator
Iterates over utilities defined in all registries, starting with local ones.
"""
for registry in get_registries():
for utility in registry.registeredUtilities():
yield utility
[docs]def query_utility(provided, name='', default=None):
"""Query utility registered with given interface
Do a registry lookup for given utility into local registry first, then on each registry
associated with current thread stack.
:param Interface provided: the requested interface
:param str name: name of the requested utility
:param object default: the default object returned if the requested utility can't be found
:return: object; the requested object, or *default* if it can't be found
"""
try:
for registry in get_registries():
utility = registry.queryUtility(provided, name, default)
if utility is not None:
return utility
except POSError:
pass
return default
[docs]def get_utility(provided, name=''):
"""Get utility registered with given interface
Do a registry lookup for given utility into local registry first, then on each registry
associated with current thread stack.
:param Interface provided: the requested interface
:param str name: name of the requested utility
:return: object; the requested object. A *ComponentLookupError* is raised if the utility
can't be found.
"""
for registry in get_registries():
utility = registry.queryUtility(provided, name)
if utility is not None:
return utility
raise ComponentLookupError(provided, name)
[docs]def get_utilities_for(interface):
"""Get utilities registered with given interface as (name, util) tuples iterator
Do a registry lookup for matching utilities into local registry first, then on each registry
associated with current thread stack.
"""
for registry in get_registries():
for utility in registry.getUtilitiesFor(interface):
yield utility
[docs]def get_all_utilities_registered_for(interface): # pylint: disable=invalid-name
"""Get list of registered utilities for given interface
Do a registry lookup for matching utilities into local registry first, then on each registry
associated with current thread stack.
"""
result = []
for registry in get_registries():
for utilities in registry.getAllUtilitiesRegisteredFor(interface):
result.append(utilities)
return result
[docs]class utility_config: # pylint: disable=invalid-name
"""Function or class decorator to register a utility in the global registry
:param str name: default=''; name under which the utility is registered
:param Interface provides: the interface for which the utility is registered
Please note that a single utility can be registered several times (using several annotations),
with different names.
If several utilities are registered for the same interface with the same name, the last
registered utility will override the previous ones.
"""
venusian = venusian
def __init__(self, **settings):
self.__dict__.update(settings)
def __call__(self, wrapped):
settings = self.__dict__.copy()
depth = settings.pop('_depth', 0)
def callback(context, name, obj):
if isinstance(obj, type):
factory = obj
component = None
else:
factory = None
component = obj
provides = settings.get('provides')
if provides is None:
if factory:
provides = list(implementedBy(factory))
else:
provides = list(providedBy(component))
if len(provides) == 1:
provides = provides[0]
else:
raise TypeError("Missing 'provides' argument")
LOGGER.debug("Registering utility {0} named '{1}' providing {2}".format(
str(component) if component else str(factory),
settings.get('name', ''),
str(provides)))
registry = settings.get('registry')
if registry is None:
config = context.config.with_package(info.module) # pylint: disable=no-member
registry = config.registry
registry.registerUtility(component=component, factory=factory,
provided=provides, name=settings.get('name', ''))
info = self.venusian.attach(wrapped, callback, category='pyams_utility',
depth=depth + 1)
if info.scope == 'class': # pylint: disable=no-member
# if the decorator was attached to a method in a class, or
# otherwise executed at class scope, we need to set an
# 'attr' into the settings if one isn't already in there
if settings.get('attr') is None:
settings['attr'] = wrapped.__name__
settings['_info'] = info.codeinfo # pylint: disable=no-member
return wrapped