#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
__docformat__ = 'restructuredtext'
# import standard library
import os.path
import subprocess
from tempfile import NamedTemporaryFile
# import interfaces
from pyams_file.interfaces import IVideo, IThumbnails, IThumbnailFile
from pyams_media.interfaces import IVideoType
from pyams_utils.interfaces.tales import ITALESExtension
from zope.annotation.interfaces import IAnnotations
from zope.traversing.interfaces import ITraversable
# import packages
import transaction
from pyams_file.file import ImageFile, get_magic_content_type
from pyams_file.image import ThumbnailGeometry
from pyams_media.ffbase import FFmpeg
from pyams_utils.adapter import adapter_config, ContextAdapter, ContextRequestViewAdapter
from pyramid.threadlocal import get_current_registry
from zope.interface import alsoProvides, Interface
from zope.lifecycleevent import ObjectCreatedEvent, ObjectAddedEvent
from zope.location import locate
THUMBNAIL_ANNOTATION_KEY = 'pyams_media.video.thumbnail'
THUMBNAIL_SIZE_ANNOTATION_KEY = 'pyams_media.video.thumbnail_size'
[docs]@adapter_config(context=IVideo, provides=IThumbnails)
class VideoThumbnailAdapter(object):
"""Video thumbnail adapter"""
def __init__(self, video):
self.video = video
annotations = self.annotations = IAnnotations(video)
self.thumbnail = annotations.get(THUMBNAIL_ANNOTATION_KEY)
[docs] def get_image_size(self):
image_size = self.annotations.get(THUMBNAIL_SIZE_ANNOTATION_KEY)
if image_size is None:
if self.thumbnail is not None:
image_size = self.thumbnail.get_image_size()
else:
mpeg = FFmpeg('avprobe')
streams = mpeg.info(self.video).get('streams', ())
for stream in streams:
if stream.get('codec_type') != 'video':
continue
image_size = stream.get('width'), stream.get('height')
if image_size is not None:
self.annotations[THUMBNAIL_SIZE_ANNOTATION_KEY] = image_size
return image_size
[docs] def get_thumbnail_size(self, thumbnail_name, forced=False):
if self.thumbnail is not None:
return IThumbnails(self.thumbnail).get_thumbnail_size(thumbnail_name, forced)
else:
return self.get_image_size()
[docs] def get_geometry(self, selection_name):
if self.thumbnail is not None:
return IThumbnails(self.thumbnail).get_geometry(selection_name)
else:
size = self.get_image_size()
if size:
geometry = ThumbnailGeometry()
geometry.x1 = 0
geometry.y1 = 0
geometry.x2 = size[0]
geometry.y2 = size[1]
return geometry
[docs] def set_geometry(self, selection_name, geometry):
if self.thumbnail is not None:
IThumbnails(self.thumbnail).set_geometry(selection_name, geometry)
[docs] def clear_geometries(self):
if self.thumbnail is not None:
IThumbnails(self.thumbnail).clear_geometries()
[docs] def get_thumbnail_name(self, thumbnail_name, with_size=False):
if self.thumbnail is not None:
return IThumbnails(self.thumbnail).get_thumbnail_name(thumbnail_name, with_size)
else:
size = self.get_image_size()
if size is not None:
if with_size:
return '{0}x{1}'.format(*size), size
else:
return '{0}x{1}'.format(*size)
else:
return None, None
[docs] def get_thumbnail(self, thumbnail_name, format=None, time=5):
if self.thumbnail is None:
pipe = subprocess.Popen(('avconv', '-i', '-', '-ss', str(time), '-f', 'image2', '-vframes', '1', '-'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if pipe:
stdout, stderr = pipe.communicate(self.video.data)
# Some videos formats can't be converted via pipes
# If so, we must provide a temporay file...
if not stdout:
output = NamedTemporaryFile(prefix='video_', suffix='.thumb')
output.write(self.video.data)
output.file.flush()
pipe = subprocess.Popen(('avconv', '-i', output.name, '-ss', str(time), '-f', 'image2',
'-vframes', '1', '-'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if pipe:
stdout, stderr = pipe.communicate()
# Create final image
registry = get_current_registry()
annotations = IAnnotations(self.video)
image = ImageFile(stdout)
image.content_type = get_magic_content_type(image.data)
alsoProvides(image, IThumbnailFile)
registry.notify(ObjectCreatedEvent(image))
self.thumbnail = annotations[THUMBNAIL_ANNOTATION_KEY] = image
locate(self.thumbnail, self.video)
registry.notify(ObjectAddedEvent(image, self.video))
transaction.commit()
if self.thumbnail is not None:
size_name = '{0[0]}x{0[1]}'.format(self.get_image_size())
if thumbnail_name != size_name:
watermark = os.path.abspath(os.path.join(__file__, '..',
'skin', 'resources', 'img', 'video-play-mask.png'))
return IThumbnails(self.thumbnail).get_thumbnail(thumbnail_name, format, watermark)
else:
return IThumbnails(self.thumbnail).get_thumbnail(thumbnail_name, format)
[docs] def delete_thumbnail(self, thumbnail_name):
annotations = IAnnotations(self.video)
if THUMBNAIL_ANNOTATION_KEY in annotations:
del annotations[THUMBNAIL_ANNOTATION_KEY]
[docs] def clear_thumbnails(self):
annotations = IAnnotations(self.video)
if THUMBNAIL_ANNOTATION_KEY in annotations:
del annotations[THUMBNAIL_ANNOTATION_KEY]
self.thumbnail = None
[docs]@adapter_config(name='thumb', context=IVideo, provides=ITraversable)
class ThumbnailTraverser(ContextAdapter):
"""++thumb++ video namespace traverser"""
[docs] def traverse(self, name, furtherpath=None):
if '.' in name:
thumbnail_name, format = name.rsplit('.', 1)
else:
thumbnail_name = name
format = None
thumbnails = IThumbnails(self.context)
result = thumbnails.get_thumbnail(thumbnail_name, format)
transaction.commit()
return result
#
# Custom video types
#
[docs]@adapter_config(context=IVideo, provides=IVideoType)
class VideoTypeAdapter(ContextAdapter):
"""Default video content type adapter"""
@property
def video_type(self):
return self.context.content_type
[docs]@adapter_config(name='video/x-flv', context=IVideo, provides=IVideoType)
class FlashVideoTypeAdapter(ContextAdapter):
"""Flash video content type adapter"""
@property
def video_type(self):
return 'video/flash'
[docs]@adapter_config(name='video_type', context=(Interface, Interface, Interface), provides=ITALESExtension)
class VideoTypeExtension(ContextRequestViewAdapter):
"""extension:video_type(media) TALES extension"""
[docs] def render(self, context=None):
if context is None:
context = self.context
if not IVideo.providedBy(context):
return None
registry = self.request.registry
content_type = context.content_type
if isinstance(content_type, bytes):
content_type = content_type.decode()
adapter = registry.queryAdapter(context, IVideoType, name=content_type)
if adapter is None:
adapter = registry.queryAdapter(context, IVideoType)
if adapter is not None:
video_type = adapter.video_type
if isinstance(video_type, bytes):
video_type = video_type.decode()
return video_type