Source code for pyams_media.utility
#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
__docformat__ = 'restructuredtext'
# import standard library
# import interfaces
from pyams_media.interfaces import IMediaConversionUtility, CONVERTER_HANDLER_KEY, CUSTOM_AUDIO_TYPES, \
CUSTOM_VIDEO_TYPES, CONVERTER_AUTH_KEY
from zope.intid.interfaces import IIntIds
# import packages
from persistent import Persistent
from pyams_utils.registry import get_utility
from pyams_zmq.socket import zmq_socket, zmq_response
from pyramid.threadlocal import get_current_registry
from zope.container.contained import Contained
from zope.interface import implementer
from zope.schema.fieldproperty import FieldProperty
[docs]@implementer(IMediaConversionUtility)
class MediaConversionUtility(Persistent, Contained):
"""Medias conversions utility"""
zodb_name = FieldProperty(IMediaConversionUtility['zodb_name'])
video_formats = FieldProperty(IMediaConversionUtility['video_formats'])
video_frame_size = FieldProperty(IMediaConversionUtility['video_frame_size'])
video_bitrate = FieldProperty(IMediaConversionUtility['video_bitrate'])
video_quantisation = FieldProperty(IMediaConversionUtility['video_quantisation'])
video_audio_sampling = FieldProperty(IMediaConversionUtility['video_audio_sampling'])
video_audio_bitrate = FieldProperty(IMediaConversionUtility['video_audio_bitrate'])
audio_formats = FieldProperty(IMediaConversionUtility['audio_formats'])
audio_sampling = FieldProperty(IMediaConversionUtility['audio_sampling'])
audio_bitrate = FieldProperty(IMediaConversionUtility['audio_bitrate'])
[docs] def check_media_conversion(self, media):
"""Check if conversion is needed for given media"""
content_type = media.content_type
if isinstance(content_type, bytes):
content_type = content_type.decode()
if self.audio_formats and \
(content_type.startswith('audio/') or (content_type in CUSTOM_AUDIO_TYPES)):
requested_formats = self.audio_formats
elif self.video_formats and \
(content_type.startswith('video/') or (content_type in CUSTOM_VIDEO_TYPES)):
requested_formats = self.video_formats
else:
requested_formats = ()
for format in requested_formats:
self.convert(media, format)
[docs] def get_socket(self):
registry = get_current_registry()
handler = registry.settings.get(CONVERTER_HANDLER_KEY, False)
if handler:
return zmq_socket(handler, auth=registry.settings.get(CONVERTER_AUTH_KEY))
[docs] def convert(self, media, format):
"""Send conversion request for given media"""
socket = self.get_socket()
if socket is None:
return [501, "No socket handler defined in configuration file"]
intids = get_utility(IIntIds)
settings = {'zodb_name': self.zodb_name,
'media': intids.register(media),
'format': format}
socket.send_json(['convert', settings])
return zmq_response(socket)
[docs] def test_process(self):
"""Send test request to conversion process"""
socket = self.get_socket()
if socket is None:
return [501, "No socket handler defined in configuration file"]
socket.send_json(['test', {}])
return zmq_response(socket)