Source code for pyams_zmq.process

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

"""PyAMS_zmq.process module

A 0MQ process is a "classic" Python subprocess, which is starting a 0MQ event loop on startup
to be able to handle incoming connections.

To each process is attached a messages handler, which is responsible of doing the concrete
messages handling.

Process initialization arguments allows to define a list of client IP addresses which are
allowed to connect to this process, as well as a login/password authentication tokens which
must be provided to connect to this process.
"""

import multiprocessing
import signal
import sys

import zmq
from tornado import ioloop
from zmq.auth.thread import ThreadAuthenticator
from zmq.eventloop import zmqstream
from zope.interface import implementer

from pyams_utils.registry import get_global_registry
from pyams_zmq.interfaces import IZMQProcess, ZMQProcessStartedEvent


__docformat__ = 'restructuredtext'


[docs]@implementer(IZMQProcess) class ZMQProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes """ This is the base for all processes and offers utility methods for setup and creating new streams. """ socket_type = zmq.REP # pylint: disable=no-member auth_thread = None def __init__(self, bind_addr, handler, auth=None, clients=None): super(ZMQProcess, self).__init__() self.context = None """The ØMQ :class:`~zmq.Context` instance.""" self.loop = None """PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" self.bind_addr = bind_addr self.rep_stream = None self.handler = handler self.passwords = dict([auth.split(':', 1)]) if auth else None self.clients = clients.split() if clients else None
[docs] def setup(self): """Creates a :attr:`context` and an event :attr:`loop` for the process.""" ctx = self.context = zmq.Context() auth = self.auth_thread = ThreadAuthenticator(ctx) auth.start() if self.clients: auth.allow(*self.clients) # pylint: disable=not-an-iterable if self.passwords: auth.configure_plain(domain='*', passwords=self.passwords) self.loop = ioloop.IOLoop.current() self.rep_stream, _ = self.stream(self.socket_type, self.bind_addr, bind=True) self.init_stream()
[docs] def init_stream(self): """Initialize response stream""" self.rep_stream.on_recv(self.handler(self, self.rep_stream, self.stop))
[docs] def run(self): """Sets up everything and starts the event loop on process startup""" signal.signal(signal.SIGTERM, self.exit) self.setup() registry = get_global_registry() registry.notify(ZMQProcessStartedEvent(self)) # pylint: disable=no-member self.loop.start()
[docs] def stop(self): """Stops the event loop.""" if self.loop is not None: self.loop.stop() self.loop = None self.auth_thread.stop()
[docs] def exit(self, num, frame): # pylint: disable=unused-argument """Process exit""" self.stop() sys.exit()
[docs] def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): # pylint: disable=too-many-arguments """Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) :param addr: Address to bind or connect to formatted as *host:port*, *(host, port)* or *host* (bind to random port). If *bind* is ``True``, *host* may be: - the wild-card ``*``, meaning all available interfaces, - the primary IPv4 address assigned to the interface, in its numeric representation or - the interface name as defined by the operating system. If *bind* is ``False``, *host* may be: - the DNS name of the peer or - the IPv4 address of the peer, in its numeric representation. If *addr* is just a host name without a port and *bind* is ``True``, the socket will be bound to a random port. :param bind: Binds to *addr* if ``True`` or tries to connect to it otherwise. :param callback: A callback for :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional :param subscribe: Subscription pattern for *SUB* sockets, optional, defaults to ``b''``. :returns: A tuple containg the stream and the port number. """ sock = self.context.socket(sock_type) # add server authenticator if self.passwords: sock.plain_server = True # addr may be 'host:port' or ('host', port) if isinstance(addr, str): addr = addr.split(':') host, port = addr if len(addr) == 2 else (addr[0], None) # Bind/connect the socket if bind: if port: sock.bind('tcp://%s:%s' % (host, port)) else: port = sock.bind_to_random_port('tcp://%s' % host) else: sock.connect('tcp://%s:%s' % (host, port)) # Add a default subscription for SUB sockets if sock_type == zmq.SUB: # pylint: disable=no-member sock.setsockopt(zmq.SUBSCRIBE, subscribe) # pylint: disable=no-member # Create the stream and add the callback stream = zmqstream.ZMQStream(sock, self.loop) if callback: stream.on_recv(callback) return stream, int(port)
[docs]def process_exit_func(process=None): """Process exit func is required to correctly end the child process""" if process is not None: if process.is_alive(): process.terminate() process.join()