Source code for pyams_scheduler.scheduler

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'


# import standard library

# import interfaces
from pyams_scheduler.interfaces import IScheduler, ISchedulerHandler, SCHEDULER_HANDLER_KEY, SCHEDULER_AUTH_KEY
from zope.intid.interfaces import IIntIds

# import packages
from pyams_utils.registry import query_utility
from pyams_zmq.socket import zmq_socket, zmq_response
from pyramid.threadlocal import get_current_registry
from zope.container.folder import Folder
from zope.interface import implementer
from zope.schema.fieldproperty import FieldProperty


[docs]@implementer(ISchedulerHandler) class SchedulerHandler(object): """Scheduler handler utility This is just a 'marker' utility which is used to mark nodes in a cluster which should run the scheduler """
[docs]@implementer(IScheduler) class Scheduler(Folder): """Scheduler utility""" zodb_name = FieldProperty(IScheduler['zodb_name']) report_mailer = FieldProperty(IScheduler['report_mailer']) report_source = FieldProperty(IScheduler['report_source']) @property def tasks(self): return list(self.values()) @property def history(self): result = [] [result.extend(task.history) for task in self.values()] result.sort(key=lambda x: x.date) return result @property def internal_id(self): intids = query_utility(IIntIds) if intids is not None: return intids.register(self)
[docs] @staticmethod def get_socket(): """Open ØMQ socket""" registry = get_current_registry() handler = registry.settings.get(SCHEDULER_HANDLER_KEY, False) if handler: return zmq_socket(handler, auth=registry.settings.get(SCHEDULER_AUTH_KEY))
[docs] def get_task(self, task_id): intids = query_utility(IIntIds) if intids is not None: return intids.queryObject(task_id)
[docs] def get_jobs(self): socket = self.get_socket() if socket is None: return [501, "No socket handler defined in configuration file"] socket.send_json(['get_jobs', {}]) return zmq_response(socket)
[docs] def test_process(self): """Send test request to scheduler process""" socket = self.get_socket() if socket is None: return [501, "No socket handler defined in configuration file"] socket.send_json(['test', {}]) return zmq_response(socket)