Source code for pyams_media.converter

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library
import mimetypes

from io import StringIO
from tempfile import NamedTemporaryFile

# import interfaces
from pyams_media.interfaces import IMediaVideoConverter, IMediaAudioConverter, IMediaConversionUtility, IMediaConverter, \
    VIDEO_FRAME_SIZE

# import packages
from pyams_media.ffdocument import FFDocument
from pyams_utils.list import unique
from pyams_utils.registry import utility_config, get_utilities_for, query_utility
from pyams_utils.vocabulary import vocabulary_config
from zope.interface import implementer
from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm

from pyams_media import _


[docs]class BaseMediaConverter(object): """Base media converter""" format = None require_temp_file = False
[docs] def require_input_file(self, media): """Check if a physical file is required to handle conversion""" return media.content_type == b'video/quicktime'
[docs] def convert(self, media): """Convert media""" if self.require_input_file(media): input = NamedTemporaryFile(prefix='input_', suffix=mimetypes.guess_extension(media.contentType)) if isinstance(media, str): input.write(media) else: input.write(media.data) input.file.flush() document = FFDocument(input.name) else: if isinstance(media, str): media = StringIO(media) document = FFDocument(media) self.add_common_filters(document) for loop in self.get_conversion_loop(document): if self.require_temp_file: output = NamedTemporaryFile(prefix='media_', suffix='.%s' % self.format) converted = document.get_output(self.format, target=output.name) output.file.seek(0) converted['output'] = output.file.read() yield loop, converted else: yield loop, document.get_output(self.format)
[docs] def add_common_filters(self, document): document.audiochannels(2)
[docs] def add_filters(self, document): pass
[docs] def get_conversion_loop(self, document): self.add_filters(document) yield None
# # Audio converters #
[docs]@implementer(IMediaAudioConverter) class BaseAudioConverter(BaseMediaConverter): """Base media converter"""
[docs] def add_common_filters(self, document): super(BaseAudioConverter, self).add_common_filters(document) utility = query_utility(IMediaConversionUtility) if utility is not None: if utility.audio_sampling: document.audiosampling(utility.audio_sampling) if utility.audio_bitrate: document.audiobitrate(utility.audio_bitrate)
[docs]@utility_config(name='audio/wav', provides=IMediaConverter) class WavAudioConverter(BaseAudioConverter): """Default WAV media converter""" label = _("WAV audio converter") format = 'wav'
[docs]@utility_config(name='audio/mpeg', provides=IMediaConverter) class Mp3AudioConverter(BaseAudioConverter): """Default MP3 media converter""" label = _("MP3 audio converter") format = 'mp3'
[docs]@utility_config(name='audio/ogg', provides=IMediaConverter) class OggAudioConverter(BaseAudioConverter): """Default OGG audio converter""" label = _("OGG audio converter") format = 'ogg'
[docs]@vocabulary_config(name='PyAMS media audio converters') class AudioConvertersVocabulary(SimpleVocabulary): """Audio converters vocabulary""" def __init__(self, context=None): terms = [SimpleTerm(name, title=util.label) for name, util in unique(get_utilities_for(IMediaConverter)) if IMediaAudioConverter.providedBy(util)] super(AudioConvertersVocabulary, self).__init__(terms)
# # Video converters #
[docs]@implementer(IMediaVideoConverter) class BaseVideoConverter(BaseMediaConverter): """Base video converter"""
[docs] def add_filters(self, document): utility = query_utility(IMediaConversionUtility) if utility is not None: if utility.video_bitrate: document.bitrate(utility.video_bitrate) if utility.video_quantisation: document.quantizerscale(utility.video_quantisation) if utility.video_audio_sampling: document.audiosampling(utility.video_audio_sampling) if utility.video_audio_bitrate: document.audiobitrate(utility.video_audio_bitrate)
[docs] def get_conversion_loop(self, document): utility = query_utility(IMediaConversionUtility) if utility is not None: # Get video resolution video_info = document.get_stream_info('video') if video_info is not None: frame_width = video_info.get('width') else: frame_width = (0, 0) # Check same resolution conversion if video_info.get('format', {}).get('format_name') != self.format: yield None # Convert to lower resolutions self.add_filters(document) for size in utility.video_frame_size or (): target_width = VIDEO_FRAME_SIZE.get(size, (0, 0))[0] if target_width < frame_width: document.size(size) yield size
[docs]@utility_config(name='video/x-flv', provides=IMediaConverter) class FlvVideoConverter(BaseVideoConverter): """Default FLV media converter""" label = _("FLV (Flash Video) video converter") format = 'flv'
[docs] def add_common_filters(self, document): super(FlvVideoConverter, self).add_common_filters(document) # Audio sampling is required by FLV converter! utility = query_utility(IMediaConversionUtility) if utility is not None: if utility.video_audio_sampling: document.audiosampling(utility.video_audio_sampling)
[docs]@utility_config(name='video/mp4', provides=IMediaConverter) class Mp4VideoConverter(BaseVideoConverter): """Default MP4 media converter""" label = _("MP4 (HTML5) video converter") format = 'mp4' require_temp_file = True
[docs] def add_common_filters(self, document): super(Mp4VideoConverter, self).add_common_filters(document) effects = document.__effects__ effects['filter:v'] = 'setsar=1/1' effects['pix_fmt'] = 'yuv420p' effects['preset:v'] = 'slow' effects['profile:v'] = 'baseline' effects['x264-params'] = 'level=3.0:ref=1' effects['r:v'] = '25/1' effects['movflags'] = '+faststart' effects['strict'] = 'experimental'
[docs]@utility_config(name='video/ogg', provides=IMediaConverter) class OggVideoConverter(BaseVideoConverter): """OGG media converter""" label = _("OGG video converter") format = 'ogg'
[docs]@utility_config(name='video/webm', provides=IMediaConverter) class WebmVideoConverter(BaseVideoConverter): """WebM Media converter""" label = _("WebM video converter") format = 'webm' require_temp_file = True
[docs] def add_common_filters(self, document): super(WebmVideoConverter, self).add_common_filters(document) effects = document.__effects__ effects['filter:v'] = 'setsar=1/1' effects['pix_fmt'] = 'yuv420p' effects['r:v'] = '25/1'
[docs]@vocabulary_config(name='PyAMS media video converters') class VideoConvertersVocabulary(SimpleVocabulary): """Video converters vocabulary""" def __init__(self, context=None): terms = [SimpleTerm(name, title=util.label) for name, util in unique(get_utilities_for(IMediaConverter)) if IMediaVideoConverter.providedBy(util)] super(VideoConvertersVocabulary, self).__init__(terms)