#
# Copyright (c) 2008-2018 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
__docformat__ = 'restructuredtext'
# import standard library
# import interfaces
from hypatia.interfaces import ICatalog
from pyams_i18n.interfaces import II18n
from pyams_sequence.interfaces import IInternalReference, ISequentialIdInfo, ISequentialIntIds
from pyams_skin.layer import IPyAMSUserLayer
from zope.lifecycleevent.interfaces import IObjectModifiedEvent
try:
from pyams_workflow.interfaces import IWorkflow, IWorkflowVersions, IWorkflowVersion, \
IWorkflowState, IWorkflowManagedContent, IWorkflowPublicationInfo, IWorkflowTransitionEvent
except ImportError:
handle_workflow = False
else:
handle_workflow = True
# import packages
from hypatia.catalog import CatalogQuery
from hypatia.query import Eq
from pyams_catalog.query import CatalogResultSet
from pyams_utils.registry import get_utility
from pyams_utils.request import check_request
from pyramid.events import subscriber
[docs]@subscriber(IObjectModifiedEvent, context_selector=IInternalReference)
def handle_modified_reference(event):
"""Handle modified reference"""
for description in event.descriptions:
for attribute in description.attributes:
if attribute == 'reference':
del event.object.target
return
if handle_workflow:
# Target is handled as a volatile property to keep references in memory
# Reference must be updated when a content is published or retired while being in version
# greater than 1
@subscriber(IWorkflowTransitionEvent)
def handle_workflow_transition(event):
"""Handle workflow transition to update internal references"""
workflow_state = IWorkflowState(event.object, None)
if (workflow_state is None) or (workflow_state.version_id == 1):
return # don't update references on first version
sequence_info = ISequentialIdInfo(event.object, None)
if sequence_info is not None:
catalog = get_utility(ICatalog)
params = Eq(catalog['link_reference'], sequence_info.hex_oid)
for result in CatalogResultSet(CatalogQuery(catalog).query(params)):
del result.target
[docs]def get_last_version(content):
"""Check for last available version"""
if handle_workflow:
versions = IWorkflowVersions(content, None)
if versions is not None:
content = versions.get_last_versions()[0]
if ISequentialIdInfo(content, None) is not None:
return content
else:
return None
[docs]def get_visible_version(content):
"""Check for visible version"""
if handle_workflow:
versions = IWorkflowVersions(content, None)
if versions is not None:
workflow = IWorkflow(content)
visible_versions = versions.get_versions(workflow.visible_states, sort=True)
if visible_versions:
return visible_versions[-1]
else:
return None
publication_info = IWorkflowPublicationInfo(content, None)
if publication_info is not None:
return content if publication_info.is_visible() else None
return content
[docs]def get_version_in_state(content, state):
"""Check for versions in given status"""
if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
versions = IWorkflowVersions(content).get_versions(state, sort=True)
if versions:
content = versions[-1]
if ISequentialIdInfo(content, None) is not None:
return content
else:
return None
[docs]def get_sequence_dict(version, attribute='title', request=None):
"""Get OID and label matching given version"""
sequence = get_utility(ISequentialIntIds)
info = ISequentialIdInfo(version)
return {'id': info.hex_oid,
'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request),
sequence.get_short_oid(info.oid))}
[docs]def get_sequence_target(oid, state):
"""Get content matching given OID"""
sequence = get_utility(ISequentialIntIds)
content = sequence.query_object_from_oid(oid)
if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)):
versions = IWorkflowVersions(content).get_versions(state, sort=True)
if versions:
content = versions[0]
return content
[docs]def get_reference_target(reference, state=None, request=None):
"""Get target of given reference OID"""
if not reference:
return None
catalog = get_utility(ICatalog)
params = Eq(catalog['oid'], reference)
results = list(CatalogResultSet(CatalogQuery(catalog).query(params)))
if results:
if state:
results = list(filter(lambda x: get_version_in_state(x, state), results))
else:
if request is None:
request = check_request()
if IPyAMSUserLayer.providedBy(request):
getter = get_visible_version
else:
getter = get_last_version
results = list(map(getter, results))
if results:
return results[0]