#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
__docformat__ = 'restructuredtext'
# import standard library
# import interfaces
from pyams_scheduler.interfaces import IScheduler, ISchedulerHandler, SCHEDULER_HANDLER_KEY, SCHEDULER_AUTH_KEY
from zope.intid.interfaces import IIntIds
# import packages
from pyams_utils.registry import query_utility
from pyams_zmq.socket import zmq_socket, zmq_response
from pyramid.threadlocal import get_current_registry
from zope.container.folder import Folder
from zope.interface import implementer
from zope.schema.fieldproperty import FieldProperty
[docs]@implementer(ISchedulerHandler)
class SchedulerHandler(object):
"""Scheduler handler utility
This is just a 'marker' utility which is used to mark nodes in a cluster
which should run the scheduler
"""
[docs]@implementer(IScheduler)
class Scheduler(Folder):
"""Scheduler utility"""
zodb_name = FieldProperty(IScheduler['zodb_name'])
report_mailer = FieldProperty(IScheduler['report_mailer'])
report_source = FieldProperty(IScheduler['report_source'])
@property
def tasks(self):
return list(self.values())
@property
def history(self):
result = []
[result.extend(task.history) for task in self.values()]
result.sort(key=lambda x: x.date)
return result
@property
def internal_id(self):
intids = query_utility(IIntIds)
if intids is not None:
return intids.register(self)
[docs] @staticmethod
def get_socket():
"""Open ØMQ socket"""
registry = get_current_registry()
handler = registry.settings.get(SCHEDULER_HANDLER_KEY, False)
if handler:
return zmq_socket(handler, auth=registry.settings.get(SCHEDULER_AUTH_KEY))
[docs] def get_task(self, task_id):
intids = query_utility(IIntIds)
if intids is not None:
return intids.queryObject(task_id)
[docs] def get_jobs(self):
socket = self.get_socket()
if socket is None:
return [501, "No socket handler defined in configuration file"]
socket.send_json(['get_jobs', {}])
return zmq_response(socket)
[docs] def test_process(self):
"""Send test request to scheduler process"""
socket = self.get_socket()
if socket is None:
return [501, "No socket handler defined in configuration file"]
socket.send_json(['test', {}])
return zmq_response(socket)