#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
__docformat__ = 'restructuredtext'
# import standard library
# import interfaces
from pyams_sequence.interfaces import ISequentialIntIds, ISequentialIdTarget, ISequentialIdInfo
from zope.lifecycleevent.interfaces import IObjectAddedEvent, IObjectRemovedEvent
# import packages
from pyams_utils.registry import query_utility
from pyramid.events import subscriber
from zope.interface import implementer, Invalid
from zope.intid import IntIds
from zope.schema.fieldproperty import FieldProperty
from pyams_sequence import _
[docs]@implementer(ISequentialIntIds)
class SequentialIntIds(IntIds):
"""Sequential IntIds utility"""
prefix = FieldProperty(ISequentialIntIds['prefix'])
hex_oid_length = FieldProperty(ISequentialIntIds['hex_oid_length'])
_last_oid = FieldProperty(ISequentialIntIds['last_oid'])
@property
def last_oid(self):
return self._last_oid
@last_oid.setter
def last_oid(self, value):
if value < self._last_oid:
raise Invalid(_("Can't set last OID to value lower than current one!"))
self._last_oid = value
def _generateId(self):
self._last_oid += 1
return self._last_oid
[docs] def register(self, ob):
if not ISequentialIdTarget.providedBy(ob):
return None
return super(SequentialIntIds, self).register(ob)
[docs] def query_hex_oid(self, obj):
oid = self.queryId(obj)
if oid is not None:
return '{{prefix}}{{obj_prefix}}{{hex_id:0{length}x}}' \
.format(length=self.hex_oid_length) \
.format(prefix=self.prefix or '',
obj_prefix=getattr(obj, 'sequence_prefix', ''),
hex_id=oid)
[docs] def get_full_oid(self, oid, obj_prefix=None):
if oid.startswith('+'):
oid = oid[1:].lower()
elif self.prefix and oid.startswith(self.prefix):
return oid
return '{prefix}{obj_prefix}{zeros}{hex_id}'.format(prefix=self.prefix or '',
obj_prefix=obj_prefix or '',
zeros='0' * (self.hex_oid_length - len(oid)),
hex_id=oid.lower() if oid else 0)
[docs] def get_short_oid(self, oid, obj_prefix=None):
return '{prefix}{obj_prefix} {hex_id:x}'.format(prefix=self.prefix or '',
obj_prefix=obj_prefix or '',
hex_id=oid or 0)
[docs] def get_base_oid(self, oid, obj_prefix=None):
return '{obj_prefix} {hex_id:x}'.format(obj_prefix=obj_prefix or '',
hex_id=oid or 0)
[docs] def get_internal_id(self, oid):
if oid.startswith('+'):
oid = oid[1:]
elif self.prefix and oid.startswith(self.prefix):
oid = oid[len(self.prefix):]
return int(oid, 16)
[docs] def query_object_from_oid(self, oid):
internal_id = self.get_internal_id(oid)
return self.queryObject(internal_id)
[docs]@subscriber(IObjectAddedEvent, context_selector=ISequentialIdTarget)
def handle_added_intid_target(event):
"""Handle added sequential ID target"""
target = event.object
sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
if sequence is not None:
info = ISequentialIdInfo(target)
if not info.oid: # Objects cloned inside a workflow may share the same ID...
info.oid = sequence.register(target)
info.hex_oid = sequence.query_hex_oid(target)
[docs]@subscriber(IObjectRemovedEvent, context_selector=ISequentialIdTarget)
def handle_removed_intid_target(event):
"""Handle removed sequential ID target"""
target = event.object
sequence = query_utility(ISequentialIntIds, name=getattr(target, 'sequence_name', ''))
if sequence is not None:
info = ISequentialIdInfo(target)
if info.oid:
sequence.unregister(target)