Source code for pyams_media.video

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library
import os.path
import subprocess

from tempfile import NamedTemporaryFile

# import interfaces
from pyams_file.interfaces import IVideo, IThumbnails, IThumbnailFile
from pyams_media.interfaces import IVideoType
from pyams_utils.interfaces.tales import ITALESExtension
from zope.annotation.interfaces import IAnnotations
from zope.traversing.interfaces import ITraversable

# import packages
import transaction

from pyams_file.file import ImageFile, get_magic_content_type
from pyams_file.image import ThumbnailGeometry
from pyams_media.ffbase import FFmpeg
from pyams_utils.adapter import adapter_config, ContextAdapter, ContextRequestViewAdapter
from pyramid.threadlocal import get_current_registry
from zope.interface import alsoProvides, Interface
from zope.lifecycleevent import ObjectCreatedEvent, ObjectAddedEvent
from zope.location import locate


THUMBNAIL_ANNOTATION_KEY = 'pyams_media.video.thumbnail'
THUMBNAIL_SIZE_ANNOTATION_KEY = 'pyams_media.video.thumbnail_size'


[docs]@adapter_config(context=IVideo, provides=IThumbnails) class VideoThumbnailAdapter(object): """Video thumbnail adapter""" def __init__(self, video): self.video = video annotations = self.annotations = IAnnotations(video) self.thumbnail = annotations.get(THUMBNAIL_ANNOTATION_KEY)
[docs] def get_image_size(self): image_size = self.annotations.get(THUMBNAIL_SIZE_ANNOTATION_KEY) if image_size is None: if self.thumbnail is not None: image_size = self.thumbnail.get_image_size() else: mpeg = FFmpeg('avprobe') streams = mpeg.info(self.video).get('streams', ()) for stream in streams: if stream.get('codec_type') != 'video': continue image_size = stream.get('width'), stream.get('height') if image_size is not None: self.annotations[THUMBNAIL_SIZE_ANNOTATION_KEY] = image_size return image_size
[docs] def get_thumbnail_size(self, thumbnail_name, forced=False): if self.thumbnail is not None: return IThumbnails(self.thumbnail).get_thumbnail_size(thumbnail_name, forced) else: return self.get_image_size()
[docs] def get_geometry(self, selection_name): if self.thumbnail is not None: return IThumbnails(self.thumbnail).get_geometry(selection_name) else: size = self.get_image_size() if size: geometry = ThumbnailGeometry() geometry.x1 = 0 geometry.y1 = 0 geometry.x2 = size[0] geometry.y2 = size[1] return geometry
[docs] def set_geometry(self, selection_name, geometry): if self.thumbnail is not None: IThumbnails(self.thumbnail).set_geometry(selection_name, geometry)
[docs] def clear_geometries(self): if self.thumbnail is not None: IThumbnails(self.thumbnail).clear_geometries()
[docs] def get_thumbnail_name(self, thumbnail_name, with_size=False): if self.thumbnail is not None: return IThumbnails(self.thumbnail).get_thumbnail_name(thumbnail_name, with_size) else: size = self.get_image_size() if size is not None: if with_size: return '{0}x{1}'.format(*size), size else: return '{0}x{1}'.format(*size) else: return None, None
[docs] def get_thumbnail(self, thumbnail_name, format=None, time=5): if self.thumbnail is None: pipe = subprocess.Popen(('avconv', '-i', '-', '-ss', str(time), '-f', 'image2', '-vframes', '1', '-'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if pipe: stdout, stderr = pipe.communicate(self.video.data) # Some videos formats can't be converted via pipes # If so, we must provide a temporay file... if not stdout: output = NamedTemporaryFile(prefix='video_', suffix='.thumb') output.write(self.video.data) output.file.flush() pipe = subprocess.Popen(('avconv', '-i', output.name, '-ss', str(time), '-f', 'image2', '-vframes', '1', '-'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if pipe: stdout, stderr = pipe.communicate() # Create final image registry = get_current_registry() annotations = IAnnotations(self.video) image = ImageFile(stdout) image.content_type = get_magic_content_type(image.data) alsoProvides(image, IThumbnailFile) registry.notify(ObjectCreatedEvent(image)) self.thumbnail = annotations[THUMBNAIL_ANNOTATION_KEY] = image locate(self.thumbnail, self.video) registry.notify(ObjectAddedEvent(image, self.video)) transaction.commit() if self.thumbnail is not None: size_name = '{0[0]}x{0[1]}'.format(self.get_image_size()) if thumbnail_name != size_name: watermark = os.path.abspath(os.path.join(__file__, '..', 'skin', 'resources', 'img', 'video-play-mask.png')) return IThumbnails(self.thumbnail).get_thumbnail(thumbnail_name, format, watermark) else: return IThumbnails(self.thumbnail).get_thumbnail(thumbnail_name, format)
[docs] def delete_thumbnail(self, thumbnail_name): annotations = IAnnotations(self.video) if THUMBNAIL_ANNOTATION_KEY in annotations: del annotations[THUMBNAIL_ANNOTATION_KEY]
[docs] def clear_thumbnails(self): annotations = IAnnotations(self.video) if THUMBNAIL_ANNOTATION_KEY in annotations: del annotations[THUMBNAIL_ANNOTATION_KEY] self.thumbnail = None
[docs]@adapter_config(name='thumb', context=IVideo, provides=ITraversable) class ThumbnailTraverser(ContextAdapter): """++thumb++ video namespace traverser"""
[docs] def traverse(self, name, furtherpath=None): if '.' in name: thumbnail_name, format = name.rsplit('.', 1) else: thumbnail_name = name format = None thumbnails = IThumbnails(self.context) result = thumbnails.get_thumbnail(thumbnail_name, format) transaction.commit() return result
# # Custom video types #
[docs]@adapter_config(context=IVideo, provides=IVideoType) class VideoTypeAdapter(ContextAdapter): """Default video content type adapter""" @property def video_type(self): return self.context.content_type
[docs]@adapter_config(name='video/x-flv', context=IVideo, provides=IVideoType) class FlashVideoTypeAdapter(ContextAdapter): """Flash video content type adapter""" @property def video_type(self): return 'video/flash'
[docs]@adapter_config(name='video_type', context=(Interface, Interface, Interface), provides=ITALESExtension) class VideoTypeExtension(ContextRequestViewAdapter): """extension:video_type(media) TALES extension"""
[docs] def render(self, context=None): if context is None: context = self.context if not IVideo.providedBy(context): return None registry = self.request.registry content_type = context.content_type if isinstance(content_type, bytes): content_type = content_type.decode() adapter = registry.queryAdapter(context, IVideoType, name=content_type) if adapter is None: adapter = registry.queryAdapter(context, IVideoType) if adapter is not None: video_type = adapter.video_type if isinstance(video_type, bytes): video_type = video_type.decode() return video_type