Source code for pyams_sequence.reference

#
# Copyright (c) 2008-2018 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library

# import interfaces
from hypatia.interfaces import ICatalog
from pyams_i18n.interfaces import II18n
from pyams_sequence.interfaces import IInternalReference, ISequentialIdInfo, ISequentialIntIds
from pyams_skin.layer import IPyAMSUserLayer
from zope.lifecycleevent.interfaces import IObjectModifiedEvent

try:
    from pyams_workflow.interfaces import IWorkflow, IWorkflowVersions, IWorkflowVersion, \
        IWorkflowState, IWorkflowManagedContent, IWorkflowPublicationInfo, IWorkflowTransitionEvent
except ImportError:
    handle_workflow = False
else:
    handle_workflow = True

# import packages
from hypatia.catalog import CatalogQuery
from hypatia.query import Eq
from pyams_catalog.query import CatalogResultSet
from pyams_utils.registry import get_utility
from pyams_utils.request import check_request
from pyramid.events import subscriber


[docs]@subscriber(IObjectModifiedEvent, context_selector=IInternalReference) def handle_modified_reference(event): """Handle modified reference""" for description in event.descriptions: for attribute in description.attributes: if attribute == 'reference': del event.object.target return
if handle_workflow: # Target is handled as a volatile property to keep references in memory # Reference must be updated when a content is published or retired while being in version # greater than 1 @subscriber(IWorkflowTransitionEvent) def handle_workflow_transition(event): """Handle workflow transition to update internal references""" workflow_state = IWorkflowState(event.object, None) if (workflow_state is None) or (workflow_state.version_id == 1): return # don't update references on first version sequence_info = ISequentialIdInfo(event.object, None) if sequence_info is not None: catalog = get_utility(ICatalog) params = Eq(catalog['link_reference'], sequence_info.hex_oid) for result in CatalogResultSet(CatalogQuery(catalog).query(params)): del result.target
[docs]def get_last_version(content): """Check for last available version""" if handle_workflow: versions = IWorkflowVersions(content, None) if versions is not None: content = versions.get_last_versions()[0] if ISequentialIdInfo(content, None) is not None: return content else: return None
[docs]def get_visible_version(content): """Check for visible version""" if handle_workflow: versions = IWorkflowVersions(content, None) if versions is not None: workflow = IWorkflow(content) visible_versions = versions.get_versions(workflow.visible_states, sort=True) if visible_versions: return visible_versions[-1] else: return None publication_info = IWorkflowPublicationInfo(content, None) if publication_info is not None: return content if publication_info.is_visible() else None return content
[docs]def get_version_in_state(content, state): """Check for versions in given status""" if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)): versions = IWorkflowVersions(content).get_versions(state, sort=True) if versions: content = versions[-1] if ISequentialIdInfo(content, None) is not None: return content else: return None
[docs]def get_sequence_dict(version, attribute='title', request=None): """Get OID and label matching given version""" sequence = get_utility(ISequentialIntIds) info = ISequentialIdInfo(version) return {'id': info.hex_oid, 'text': '{0} ({1})'.format(II18n(version).query_attribute(attribute, request=request), sequence.get_short_oid(info.oid))}
[docs]def get_sequence_target(oid, state): """Get content matching given OID""" sequence = get_utility(ISequentialIntIds) content = sequence.query_object_from_oid(oid) if handle_workflow and (IWorkflowVersion.providedBy(content) or IWorkflowManagedContent.providedBy(content)): versions = IWorkflowVersions(content).get_versions(state, sort=True) if versions: content = versions[0] return content
[docs]def get_reference_target(reference, state=None, request=None): """Get target of given reference OID""" if not reference: return None catalog = get_utility(ICatalog) params = Eq(catalog['oid'], reference) results = list(CatalogResultSet(CatalogQuery(catalog).query(params))) if results: if state: results = list(filter(lambda x: get_version_in_state(x, state), results)) else: if request is None: request = check_request() if IPyAMSUserLayer.providedBy(request): getter = get_visible_version else: getter = get_last_version results = list(map(getter, results)) if results: return results[0]