Source code for pyams_sequence.utility

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library

# import interfaces
from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo
from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent

# import packages
from pyams_utils.registry import query_utility
from pyramid.events import subscriber
from zope.interface import implementer, Invalid
from zope.intid import IntIds
from zope.schema.fieldproperty import FieldProperty

from pyams_sequence import _


[docs]@implementer(ISequentialIntIds) class SequentialIntIds(IntIds): """Sequential IntIds utility""" prefix = FieldProperty(ISequentialIntIds['prefix']) hex_oid_length = FieldProperty(ISequentialIntIds['hex_oid_length']) _last_oid = FieldProperty(ISequentialIntIds['last_oid']) @property def last_oid(self): return self._last_oid @last_oid.setter def last_oid(self, value): if value < self._last_oid: raise Invalid(_("Can't set last OID to value lower than current one!")) self._last_oid = value def _generateId(self): self._last_oid += 1 return self._last_oid
[docs] def register(self, ob): if not ISequentialIdTarget.providedBy(ob): return None return super(SequentialIntIds, self).register(ob)
[docs] def query_hex_oid(self, obj): oid = self.queryId(obj) if oid is not None: return '{{prefix}}{{obj_prefix}}{{hex_id:0{length}x}}' \ .format(length=self.hex_oid_length) \ .format(prefix=self.prefix or '', obj_prefix=getattr(obj, 'sequence_prefix', ''), hex_id=oid)
[docs] def get_full_oid(self, oid, obj_prefix=None): if oid.startswith('+'): oid = oid[1:].lower() elif self.prefix and oid.startswith(self.prefix): return oid return '{prefix}{obj_prefix}{zeros}{hex_id}'.format(prefix=self.prefix or '', obj_prefix=obj_prefix or '', zeros='0' * (self.hex_oid_length - len(oid)), hex_id=oid.lower() if oid else 0)
[docs] def get_short_oid(self, oid, obj_prefix=None): return '{prefix}{obj_prefix} {hex_id:x}'.format(prefix=self.prefix or '', obj_prefix=obj_prefix or '', hex_id=oid or 0)
[docs] def get_base_oid(self, oid, obj_prefix=None): return '{obj_prefix} {hex_id:x}'.format(obj_prefix=obj_prefix or '', hex_id=oid or 0)
[docs] def get_internal_id(self, oid): if oid.startswith('+'): oid = oid[1:] elif self.prefix and oid.startswith(self.prefix): oid = oid[len(self.prefix):] return int(oid, 16)
[docs] def query_object_from_oid(self, oid): internal_id = self.get_internal_id(oid) return self.queryObject(internal_id)
[docs]@subscriber(IObjectAddedEvent, context_selector=ISequentialIdTarget) def handle_added_intid_target(event): """Handle added sequential ID target""" target = event.object sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', '')) if sequence is not None: info = ISequentialIdInfo(target) if not info.oid: # Objects cloned inside a workflow may share the same ID... info.oid = sequence.register(target) info.hex_oid = sequence.query_hex_oid(target)
[docs]@subscriber(IObjectRemovedEvent, context_selector=ISequentialIdTarget) def handle_removed_intid_target(event): """Handle removed sequential ID target""" target = event.object sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', '')) if sequence is not None: info = ISequentialIdInfo(target) if info.oid: sequence.unregister(target)