Source code for pyams_file.file

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'

try:
    import magic
except ImportError:
    magic = None

import os
from io import BytesIO

from BTrees.OOBTree import OOBTree
from PIL import Image
from ZODB.blob import Blob
from ZODB.utils import oid_repr
from persistent import Persistent
from pyramid.events import subscriber
from zope.container.contained import Contained
from zope.copy.interfaces import ICopyHook, ResumeCopy
from zope.interface import implementer
from zope.lifecycleevent import IObjectAddedEvent, IObjectRemovedEvent
from zope.location.interfaces import IContained
from zope.schema.fieldproperty import FieldProperty

from pyams_file.interfaces import FileModifiedEvent, IAudio, IBlobReferenceManager, IFile, IFileInfo, IImage, ISVGImage, \
    IVideo
from pyams_utils.adapter import ContextAdapter, adapter_config
from pyams_utils.registry import get_utility
from pyams_utils.request import check_request


BLOCK_SIZE = 1 << 16

EXTENSIONS_THUMBNAILS = {
    '.7z': 'application-x-7z-compressed.png',
    '.ac3': 'audio-ac3.png',
    '.afm': 'application-x-font-afm.png',
    '.avi': 'video-x-generic.png',
    '.bmp': 'application-x-egon.png',
    '.bz2': 'application-x-bzip.png',
    '.css': 'text-css.png',
    '.csv': 'text-csv.png',
    '.doc': 'application-msword.png',
    '.docx': 'application-msword.png',
    '.dot': 'application-msword-template.png',
    '.deb': 'application-x-deb.png',
    '.eps': 'image-x-eps.png',
    '.exe': 'application-x-ms-dos-executable.png',
    '.flv': 'application-x-shockwave-flash.png',
    '.gif': 'application-x-egon.png',
    '.gz': 'application-x-bzip.png',
    '.htm': 'application-x-mswinurl.png',
    '.html': 'application-x-mswinurl.png',
    '.jar': 'application-x-java-archive.png',
    '.java': 'text-x-java.png',
    '.jpeg': 'application-x-egon.png',
    '.jpg': 'application-x-egon.png',
    '.js': 'application-javascript.png',
    '.mp2': 'audio-ac3.png',
    '.mp3': 'audio-ac3.png',
    '.mp4': 'application-x-shockwave-flash.png',
    '.mpeg': 'audio-ac3.png',
    '.mpg': 'audio-ac3.png',
    '.mov': 'application-x-shockwave-flash.png',
    '.odf': 'odf.png',
    '.odp': 'application-vnd.oasis.opendocument.presentation.png',
    '.ods': 'application-vnd.oasis.opendocument.spreadsheet.png',
    '.odt': 'application-msword.png',
    '.ogg': 'audio-x-flac+ogg.png',
    '.otf': 'application-x-font-otf.png',
    '.otp': 'application-vnd.oasis.opendocument.presentation-template.png',
    '.ots': 'application-vnd.oasis.opendocument.spreadsheet-template.png',
    '.ott': 'application-msword-template.png',
    '.pdf': 'application-pdf.png',
    '.php': 'application-x-php.png',
    '.pl': 'application-x-perl.png',
    '.png': 'application-x-egon.png',
    '.ppt': 'application-vnd.ms-powerpoint.png',
    '.ps': 'application-postscript.png',
    '.psd': 'application-x-krita.png',
    '.py': 'text-x-python.png',
    '.rpm': 'application-x-rpm.png',
    '.rdf': 'text-rdf+xml.png',
    '.rtf': 'application-rtf.png',
    '.sql': 'text-x-sql.png',
    '.svg': 'application-x-kontour.png',
    '.tif': 'application-x-egon.png',
    '.tiff': 'application-x-egon.png',
    '.ttf': 'application-x-font-ttf.png',
    '.txt': 'text-plain.png',
    '.vhd': 'application-x-smb-workgroup.png',
    '.xls': 'application-vnd.ms-excel.png',
    '.xlsx': 'application-vnd.ms-excel.png',
    '.xml': 'application-xml.png',
    '.wav': 'audio-x-adpcm.png',
    '.webm': 'application-x-shockwave-flash.png',
    '.wmf': 'application-x-wmf.png',
    '.wmv': 'video-x-generic.png',
    '.xcf': 'application-x-krita.png',
    '.zip': 'application-x-7z-compressed.png'
}


#
# Blobs references manager utility
#

[docs]@implementer(IBlobReferenceManager) class BlobReferencesManager(Persistent, Contained): """Global blobs references manager utility The utility is used to keep all references of persistent files objects to their blobs. References management is done automatically when using file-related properties, like :ref:`pyams_file.property.FileProperty` or :ref:`pyams_i18n.property.I18nFileProperty`. """ def __init__(self): self.refs = OOBTree()
[docs] def add_reference(self, blob, reference): oid = getattr(blob, '_p_oid') if not oid: getattr(reference, '_p_jar').add(blob) oid = getattr(blob, '_p_oid') oid = oid_repr(oid) refs = self.refs.get(oid) or set() refs.add(reference) self.refs[oid] = refs
[docs] def drop_reference(self, blob, reference): oid = oid_repr(getattr(blob, '_p_oid')) refs = self.refs.get(oid) if refs is not None: if reference in refs: refs.remove(reference) if refs: self.refs[oid] = refs else: del self.refs[oid] del blob else: del blob
# # Persistent file class #
[docs]@implementer(IFile, IFileInfo, IContained) class File(Persistent, Contained): """Generic file persistent object""" title = FieldProperty(IFileInfo['title']) description = FieldProperty(IFileInfo['description']) filename = FieldProperty(IFileInfo['filename']) language = FieldProperty(IFileInfo['language']) def __init__(self, data='', content_type=None, source=None): self.content_type = content_type self._blob = None if data: self.data = data elif source: if os.path.exists(source): try: f = open(source, 'rb') self.data = f finally: f.close()
[docs] def init_blob(self): """Initialize internal blob and add reference to it""" self.remove_blob_reference() self._blob = Blob()
[docs] def add_blob_reference(self, reference=None): """Add reference to internal blob""" if self._blob is not None: references = get_utility(IBlobReferenceManager) references.add_reference(self._blob, reference if reference is not None else self)
[docs] def remove_blob_reference(self): """Remove reference to internal blob Blob is deleted if there is no more reference to it. """ if self._blob is not None: references = get_utility(IBlobReferenceManager) references.drop_reference(self._blob, self) self._blob = None
[docs] def get_blob(self, mode='r'): if self._blob is None: return None return self._blob.open(mode=mode)
[docs] def get_detached_blob(self): if self._blob is None: return None return open(self._blob.committed(), 'rb')
def _get_data(self): f = self.get_blob() if f is None: return None try: data = f.read() return data finally: f.close() def _set_data(self, data): self.init_blob() if isinstance(data, str): data = data.encode('utf-8') elif hasattr(data, 'seek'): data.seek(0) f = self.get_blob('w') try: if hasattr(data, 'read'): self._size = 0 _data = data.read(BLOCK_SIZE) size = len(_data) while size > 0: f.write(_data) self._size += size _data = data.read(BLOCK_SIZE) size = len(_data) else: f.write(data) self._size = len(data) finally: f.close() data = property(_get_data, _set_data)
[docs] def get_size(self): return self._size
def __enter__(self): return self.get_blob(mode='c') def __exit__(self, exc_type, exc_val, exc_tb): # exc_val.value.close() pass def __iter__(self): if self._blob is None: raise StopIteration() with self as f: while True: chunk = f.read(BLOCK_SIZE) if not chunk: raise StopIteration(f) yield chunk def __nonzero__(self): return self._size > 0
[docs]@subscriber(IObjectAddedEvent, context_selector=IFile) def handle_added_file(event): """Add blob reference when file is added""" event.object.add_blob_reference()
[docs]@subscriber(IObjectRemovedEvent, context_selector=IFile) def handle_removed_file(event): """Remove blob associated with file when removed""" event.object.remove_blob_reference()
[docs]@adapter_config(context=IFile, provides=ICopyHook) class BlobFileCopyHook(ContextAdapter): """Blob file copy hook Inspired by z3c.blobfile package """ def __call__(self, toplevel, register): register(self._copy_blob) raise ResumeCopy def _copy_blob(self, translate): # Just add a reference to blob when copying file target = translate(self.context) setattr(target, '_blob', getattr(self.context, '_blob')) target.add_blob_reference(target)
# # Persistent images #
[docs]@implementer(IImage) class ImageFile(File): """Image file persistent object""" image_size = (-1, -1) def _set_data(self, data): if isinstance(data, str): data = BytesIO(data.encode('utf-8')) elif isinstance(data, bytes): data = BytesIO(data) File._set_data(self, data) if hasattr(data, 'seek'): data.seek(0) img = Image.open(data) self.image_size = img.size data = property(File._get_data, _set_data)
[docs] def get_image_size(self): return self.image_size
[docs] def resize(self, width, height, keep_ratio=True): image = Image.open(self.get_blob(mode='c')) image_size = image.size if width >= image_size[0] and height >= image_size[1]: return new_image = BytesIO() w_ratio = 1. * width / image_size[0] h_ratio = 1. * height / image_size[1] if keep_ratio: ratio = min(w_ratio, h_ratio) image.resize((round(ratio * image_size[0]), round(ratio * image_size[1])), Image.ANTIALIAS) \ .save(new_image, image.format, quality=99) else: image.resize((round(w_ratio * image_size[0]), round(h_ratio * image_size[1])), Image.ANTIALIAS) \ .save(new_image, image.format, quality=99) self.data = new_image request = check_request() request.registry.notify(FileModifiedEvent(self))
[docs] def crop(self, x1, y1, x2, y2): image = Image.open(self.get_blob(mode='c')) new_image = BytesIO() image.crop((x1, y1, x2, y2)) \ .save(new_image, image.format, quelity=99) self.data = new_image request = check_request() request.registry.notify(FileModifiedEvent(self))
[docs] def rotate(self, angle=-90): image = Image.open(self.get_blob(mode='c')) new_image = BytesIO() image.rotate(angle, expand=True) \ .save(new_image, image.format, quality=99) self.data = new_image request = check_request() request.registry.notify(FileModifiedEvent(self))
[docs]@implementer(ISVGImage) class SVGImageFile(File): """SVG image file persistent object"""
[docs]@implementer(IVideo) class VideoFile(File): """Video file persistent object"""
[docs]@implementer(IAudio) class AudioFile(File): """Audio file persistent object"""
# # Generic files utilities #
[docs]def get_magic_content_type(input): """Get content-type based on magic library as *bytes* As libmagic bindings are provided via several 'magic' packages, we try them in order """ if magic is not None: if hasattr(input, 'seek'): input.seek(0) if hasattr(input, 'read'): input = input.read() if hasattr(magic, 'detect_from_content'): result = magic.detect_from_content(input) if result: return result.mime_type elif hasattr(magic, 'from_buffer'): return magic.from_buffer(input, mime=True) else: return None
[docs]def FileFactory(data): """File object factory Automatically create the right file type based on magic content-type recognition """ content_type = get_magic_content_type(data) if content_type.startswith('image/svg'): factory = SVGImageFile elif content_type.startswith('image/'): factory = ImageFile elif content_type.startswith('video/'): factory = VideoFile elif content_type.startswith('audio/'): factory = AudioFile else: factory = File return factory(data, content_type)