Source code for pyams_file.property

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library

# import interfaces
from pyams_file.interfaces import IFile, IFileInfo, IFileFieldContainer, DELETED_FILE
from z3c.form.interfaces import NOT_CHANGED
from zope.schema.interfaces import IField

# import packages
from pyams_file.file import FileFactory
from pyams_utils.adapter import get_annotation_adapter
from pyramid.threadlocal import get_current_registry
from zope.interface import alsoProvides
from zope.lifecycleevent import ObjectCreatedEvent, ObjectRemovedEvent, ObjectAddedEvent
from zope.location.location import locate


FILE_CONTAINER_ATTRIBUTES = 'pyams_file.file.attributes'

_marker = object()


[docs]class FileProperty(object): """Property class used to handle files""" def __init__(self, field, name=None, klass=None, **args): if not IField.providedBy(field): raise ValueError("Provided field must implement IField interface...") if name is None: name = field.__name__ self.__field = field self.__name = name self.__klass = klass self.__args = args def __get__(self, instance, klass): if instance is None: return self value = instance.__dict__.get(self.__name, _marker) if value is _marker: field = self.__field.bind(instance) value = getattr(field, 'default', _marker) if value is _marker: raise AttributeError(self.__name) return value def __set__(self, instance, value): if value is NOT_CHANGED: return registry = get_current_registry() if (value is not None) and (value is not DELETED_FILE): filename = None # file upload data converter returns a tuple containing # filename and buffered IO stream extracted from FieldStorage... if isinstance(value, tuple): filename, value = value # initialize file through factory if not IFile.providedBy(value): factory = self.__klass or FileFactory file = factory(value, **self.__args) registry.notify(ObjectCreatedEvent(file)) if not file.get_size(): value.seek(0) # because factory may read until end of file... file.data = value value = file if filename is not None: info = IFileInfo(value) if info is not None: info.filename = filename field = self.__field.bind(instance) field.validate(value) if field.readonly and instance.__dict__.has_key(self.__name): raise ValueError(self.__name, "Field is readonly") old_value = instance.__dict__.get(self.__name, _marker) if old_value != value: # check for previous value if (old_value is not _marker) and (old_value is not None): registry.notify(ObjectRemovedEvent(old_value)) if value is DELETED_FILE: if self.__name in instance.__dict__: del instance.__dict__[self.__name] else: # set name of new value name = '++attr++{0}'.format(self.__name) if value is not None: locate(value, instance, name) instance.__dict__[self.__name] = value # store file attributes of instance if not IFileFieldContainer.providedBy(instance): alsoProvides(instance, IFileFieldContainer) attributes = get_annotation_adapter(instance, FILE_CONTAINER_ATTRIBUTES, set, notify=False, locate=False) attributes.add(self.__name) registry.notify(ObjectAddedEvent(value, instance, name))