Source code for pyams_media.media

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'

import re
from mimetypes import guess_extension, guess_type

from pyramid.events import subscriber
from transaction.interfaces import ITransactionManager
from zope.container.folder import Folder
from zope.interface import Interface, alsoProvides
from zope.lifecycleevent import ObjectCreatedEvent
from zope.lifecycleevent.interfaces import IObjectAddedEvent
from zope.traversing.interfaces import ITraversable

from pyams_file.file import FileFactory
from pyams_file.interfaces import IFile
from pyams_media.ffbase import FFmpeg
from pyams_media.ffdocument import FFDocument
from pyams_media.interfaces import CUSTOM_AUDIO_TYPES, CUSTOM_VIDEO_TYPES, IMediaConversion, IMediaConversionUtility, \
    IMediaConversions, IMediaInfo, MEDIA_CONVERSIONS_KEY, VIDEO_FRAME_SIZE
from pyams_utils.adapter import ContextAdapter, ContextRequestViewAdapter, adapter_config, get_annotation_adapter
from pyams_utils.factory import factory_config
from pyams_utils.interfaces.tales import ITALESExtension
from pyams_utils.registry import get_global_registry, query_utility

#
# Media infos
#

MEDIA_INFO_KEY = 'pyams_media.media.info'


[docs]@adapter_config(context=IFile, provides=IMediaInfo) def media_info_factory(context): """Media info adapter""" def media_info_adapter_factory(): return FFmpeg('avprobe').info(context) content_type = context.content_type if isinstance(content_type, bytes): content_type = content_type.decode() if not (content_type.startswith('audio/') or content_type.startswith('video/') or content_type in (CUSTOM_AUDIO_TYPES + CUSTOM_VIDEO_TYPES)): return None return get_annotation_adapter(context, MEDIA_INFO_KEY, factory=media_info_adapter_factory, notify=False, locate=False)
# # Media conversions # MEDIA_FRAME_SIZE = re.compile(".*-(.*)\..*")
[docs]@factory_config(IMediaConversions) class MediaConversions(Folder): """Media conversions"""
[docs] def add_conversion(self, conversion, format, extension=None, width=None): target = FileFactory(conversion) registry = get_global_registry() registry.notify(ObjectCreatedEvent(target)) alsoProvides(target, IMediaConversion) if extension is None: extension = guess_extension(format) content_type = target.content_type if isinstance(content_type, bytes): content_type = content_type.decode() target_name = '{name}{width}.{extension}'.format(name=content_type.split('/', 1)[0] if content_type else 'media', width='-{0}'.format(width) if width else '', extension=extension) if target_name in self: del self[target_name] target.filename = target_name self[target_name] = target return target
[docs] def get_conversions(self, with_source=False, order=None): result = [] if with_source: result.append(self.__parent__) result.extend(list(self.values())) if order: order = tuple(map(lambda x: x.decode() if isinstance(x, bytes) else x, order)) result.sort(key=lambda x: (order.index(x.content_type) if x.content_type in order else 999, self.get_conversion_width(x.filename) or 9999)) return result
[docs] @staticmethod def get_conversion_width(name): match = MEDIA_FRAME_SIZE.match(name) if match: return VIDEO_FRAME_SIZE.get(match.groups()[0])[0]
[docs] def has_conversion(self, formats): for conversion in self.get_conversions(): if conversion.content_type in formats: return True return False
[docs]@adapter_config(context=IFile, provides=IMediaConversions) def media_conversions_factory(context): """Media conversions factory""" return get_annotation_adapter(context, MEDIA_CONVERSIONS_KEY, IMediaConversions, name='++conversions++')
[docs]@adapter_config(name='conversions', context=IFile, provides=ITraversable) class MediaConversionsTraverser(ContextAdapter): """++conversions++ file traverser"""
[docs] def traverse(self, name, furtherpath=None): return IMediaConversions(self.context)
[docs]@adapter_config(name='conversions', context=(Interface, Interface, Interface), provides=ITALESExtension) class ConversionsExtension(ContextRequestViewAdapter): """extension:conversions(media) TALES extension"""
[docs] def render(self, context=None): if context is None: context = self.context return IMediaConversions(context, None)
# # Media files events #
[docs]def check_media_conversion(status, media): if not status: # aborted transaction return converter = query_utility(IMediaConversionUtility) if converter is not None: converter.check_media_conversion(media)
[docs]@subscriber(IObjectAddedEvent, context_selector=IFile) def handle_added_media(event): """Handle added media file""" media = event.object # Don't convert images or already converted files! if IMediaConversion.providedBy(media): return content_type = media.content_type if isinstance(content_type, bytes): content_type = content_type.decode() if (not content_type) or content_type.startswith('image/'): return # Try to use FFMpeg if content type is unknown... media_type = content_type.startswith('audio/') or \ content_type.startswith('video/') or \ content_type in (CUSTOM_AUDIO_TYPES + CUSTOM_VIDEO_TYPES) if not media_type: document = FFDocument(media) metadata = document.__metadata__ media_type = metadata.get('vtype') if media_type: ext = media_type.split('.')[0] content_type = guess_type('media.{0}'.format(ext))[0] if content_type is not None: media.content_type = content_type if media_type: ITransactionManager(media).get().addAfterCommitHook(check_media_conversion, kws={'media': media})