Source code for pyams_utils.zodb

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

""""PyAMS_utils.zodb module

This modules provides several utilities used to manage ZODB connections and persistent objects
"""

from ZEO import DB
from ZODB.interfaces import IConnection
from persistent import Persistent
from persistent.interfaces import IPersistent
from pyramid.events import subscriber
from pyramid_zodbconn import db_from_uri, get_uris
from transaction.interfaces import ITransactionManager
from zope.annotation.interfaces import IAttributeAnnotatable
from zope.container.contained import Contained
from zope.interface import implementer
from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent
from zope.schema import getFieldNames
from zope.schema.fieldproperty import FieldProperty
from zope.schema.vocabulary import SimpleTerm, SimpleVocabulary

from pyams_utils.adapter import adapter_config
from pyams_utils.interfaces import IOptionalUtility
from pyams_utils.interfaces.zeo import IZEOConnection
from pyams_utils.registry import get_global_registry, get_utilities_for
from pyams_utils.vocabulary import vocabulary_config


__docformat__ = 'restructuredtext'


[docs]@adapter_config(context=IPersistent, provides=IConnection) def persistent_connection(obj): """An adapter which gets a ZODB connection from a persistent object We are assuming the object has a parent if it has been created in this transaction. Raises ValueError if it is impossible to get a connection. """ cur = obj while not getattr(cur, '_p_jar', None): cur = getattr(cur, '__parent__', None) if cur is None: return None return cur._p_jar # pylint: disable=protected-access
# IPersistent adapters copied from zc.twist package # also register this for adapting from IConnection
[docs]@adapter_config(context=IPersistent, provides=ITransactionManager) def persistent_transaction_manager(obj): """Transaction manager adapter for persistent objects""" conn = IConnection(obj) # typically this will be # zope.keyreference.persistent.connectionOfPersistent try: return conn.transaction_manager except AttributeError: return conn._txn_mgr # pylint: disable=protected-access
# or else we give up; who knows. transaction_manager is the more # recent spelling. # # ZEO connection management #
[docs]@implementer(IZEOConnection) class ZEOConnection: """ZEO connection object This object can be used to store all settings to be able to open a ZEO connection. Note that this class is required only for tasks specifically targeting a ZEO database connection (like a ZEO packer scheduler task); for generic ZODB operations, just use a :py:class:`ZODBConnection <pyams_utils.zodb.ZODBConnection>` class defined through Pyramid's configuration file. Note that a ZEO connection object is a context manager, so you can use it like this: .. code-block:: python from pyams_utils.zodb import ZEOConnection def my_method(zeo_settings): zeo_connection = ZEOConnection() zeo_connection.update(zeo_settings) with zeo_connection as root: # *root* is then the ZODB root object # do whatever you want with ZEO connection, # which is closed automatically """ _storage = None _db = None _connection = None name = FieldProperty(IZEOConnection['name']) server_name = FieldProperty(IZEOConnection['server_name']) server_port = FieldProperty(IZEOConnection['server_port']) storage = FieldProperty(IZEOConnection['storage']) username = FieldProperty(IZEOConnection['username']) password = FieldProperty(IZEOConnection['password']) server_realm = FieldProperty(IZEOConnection['server_realm']) blob_dir = FieldProperty(IZEOConnection['blob_dir']) shared_blob_dir = FieldProperty(IZEOConnection['shared_blob_dir'])
[docs] def get_settings(self): """Get mapping of all connection settings These settings can be converted to JSON and sent to another process, for example via a ØMQ connection. :return: dict """ result = {} for name in getFieldNames(IZEOConnection): result[name] = getattr(self, name) return result
[docs] def update(self, settings): """Update connection properties with settings as *dict* :param dict settings: typically extracted via the :py:meth:`get_settings` method from another process """ names = getFieldNames(IZEOConnection) for key, value in settings.items(): if key in names: setattr(self, key, value)
[docs] def get_connection(self, wait_timeout=30, get_storage=False): """Create ZEO client connection from current settings :param boolean wait_timeout: connection timeout, in seconds :param boolean get_storage: if *True*, the method should return a tuple containing storage and DB objects; otherwise only DB object is returned :return: tuple containing ZEO client storage and DB object (if *get_storage* argument is set to *True*), or only DB object otherwise """ zdb = DB((self.server_name, self.server_port), storage=self.storage, username=self.username or '', password=self.password or '', realm=self.server_realm, blob_dir=self.blob_dir, shared_blob_dir=self.shared_blob_dir, wait_timeout=wait_timeout) return (zdb.storage, zdb) if get_storage else zdb
@property def connection(self): """Connection getter""" return self._connection # Context manager methods def __enter__(self): self._storage, self._db = self.get_connection(get_storage=True) self._connection = self._db.open_then_close_db_when_connection_closes() return self._connection.root() def __exit__(self, exc_type, exc_val, exc_tb): if self._connection is not None: self._connection.close() if self._storage is not None: self._storage.close()
[docs]@implementer(IOptionalUtility, IAttributeAnnotatable) class ZEOConnectionUtility(ZEOConnection, Persistent, Contained): """Persistent ZEO connection utility"""
[docs]@subscriber(IObjectAddedEvent, context_selector=IZEOConnection) def handle_added_connection(event): """Register new ZEO connection when added""" manager = event.newParent manager.registerUtility(event.object, IZEOConnection, name=event.object.name)
[docs]@subscriber(IObjectRemovedEvent, context_selector=IZEOConnection) def handle_removed_connection(event): """Un-register ZEO connection when deleted""" manager = event.oldParent manager.unregisterUtility(event.object, IZEOConnection, name=event.object.name)
[docs]@vocabulary_config(name='PyAMS ZEO connections') class ZEOConnectionVocabulary(SimpleVocabulary): """ZEO connections vocabulary""" def __init__(self, context=None): # pylint: disable=unused-argument terms = [SimpleTerm(name, title=util.name) for name, util in get_utilities_for(IZEOConnection)] super(ZEOConnectionVocabulary, self).__init__(terms)
[docs]def get_connection_from_settings(settings=None): """Load connection matching registry settings""" if settings is None: settings = get_global_registry().settings # pylint: disable=no-member for name, uri in get_uris(settings): zdb = db_from_uri(uri, name, {}) return zdb.open()
[docs]class ZODBConnection: """ZODB connection wrapper Connections are extracted from Pyramid's settings file in *zodbconn.uri* entries. Note that a ZODB connection object is a context manager, so you can use it like this: .. code-block:: python from pyams_utils.zodb import ZODBConnection def my_method(zodb_name): zodb_connection = ZODBConnection(zodb_name) with zodb_connection as root: # *root* is then the ZODB root object # do whatever you want with ZODB connection, # which is closed automatically """ def __init__(self, name='', settings=None): self.name = name or '' if not settings: settings = get_global_registry().settings # pylint: disable=no-member self.settings = settings _connection = None _db = None _storage = None @property def connection(self): """Connection getter""" return self._connection @property def db(self): # pylint: disable=invalid-name """Database getter""" return self._db @property def storage(self): """Storage getter""" return self._storage
[docs] def get_connection(self): """Load named connection matching registry settings""" for name, uri in get_uris(self.settings): if name == self.name: zdb = db_from_uri(uri, name, {}) connection = self._connection = zdb.open() self._db = connection.db() self._storage = self.db.storage return connection return None
[docs] def close(self): """Connection close""" self._connection.close() self._db.close() self._storage.close()
# Context manager methods def __enter__(self): connection = self.get_connection() return connection.root() def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs]@vocabulary_config(name='PyAMS ZODB connections') class ZODBConnectionVocabulary(SimpleVocabulary): """ZODB connections vocabulary""" def __init__(self, context=None): # pylint: disable=unused-argument settings = get_global_registry().settings # pylint: disable=no-member terms = [SimpleTerm(name, title=name) for name, uri in get_uris(settings)] super(ZODBConnectionVocabulary, self).__init__(terms)
VOLATILE_MARKER = object()
[docs]class volatile_property: # pylint: disable=invalid-name """Property decorator to define volatile attributes into persistent classes""" def __init__(self, fget, doc=None): self.fget = fget self.__doc__ = doc or fget.__doc__ self.__name__ = fget.__name__ self.__module__ = fget.__module__ def __get__(self, inst, cls): if inst is None: return self attrname = '_v_{0}'.format(self.__name__) value = getattr(inst, attrname, VOLATILE_MARKER) if value is VOLATILE_MARKER: value = self.fget(inst) setattr(inst, attrname, value) return value def __delete__(self, inst): attrname = '_v_{0}'.format(self.__name__) if hasattr(inst, attrname): delattr(inst, attrname)