Source code for pyams_scheduler.trigger

#
# Copyright (c) 2008-2015 Thierry Florac <tflorac AT ulthar.net>
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#

__docformat__ = 'restructuredtext'

from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from persistent import Persistent
from zope.componentvocabulary.vocabulary import UtilityVocabulary
from zope.schema.fieldproperty import FieldProperty

from pyams_scheduler.interfaces import ICronTask, ICronTaskScheduling, IDateTask, IDateTaskScheduling, ILoopTask, \
    ILoopTaskScheduling, ITaskSchedulingMode, SCHEDULER_TASK_CRON_INFO, SCHEDULER_TASK_DATE_INFO, \
    SCHEDULER_TASK_LOOP_INFO
from pyams_utils.adapter import adapter_config, get_annotation_adapter
from pyams_utils.date import date_to_datetime
from pyams_utils.factory import factory_config
from pyams_utils.registry import utility_config
from pyams_utils.timezone import tztime
from pyams_utils.vocabulary import vocabulary_config

from pyams_scheduler import _


[docs]@vocabulary_config(name='PyAMS scheduling modes') class SchedulingModesVocabulary(UtilityVocabulary): """Scheduling modes vocabulary""" interface = ITaskSchedulingMode nameOnly = True
# # Cron-style scheduling mode #
[docs]@factory_config(ICronTaskScheduling) class CronTaskScheduleInfo(Persistent): """Cron-style schedule info""" active = FieldProperty(ICronTaskScheduling['active']) start_date = FieldProperty(ICronTaskScheduling['start_date']) end_date = FieldProperty(ICronTaskScheduling['end_date']) year = FieldProperty(ICronTaskScheduling['year']) month = FieldProperty(ICronTaskScheduling['month']) day = FieldProperty(ICronTaskScheduling['day']) week = FieldProperty(ICronTaskScheduling['week']) day_of_week = FieldProperty(ICronTaskScheduling['day_of_week']) hour = FieldProperty(ICronTaskScheduling['hour']) minute = FieldProperty(ICronTaskScheduling['minute']) second = FieldProperty(ICronTaskScheduling['second'])
[docs]@adapter_config(context=ICronTask, provides=ICronTaskScheduling) def cron_task_scheduler_info_factory(context): """Cron-style task scheduling info factory""" return get_annotation_adapter(context, SCHEDULER_TASK_CRON_INFO, ICronTaskScheduling, notify=False, locate=False)
[docs]@utility_config(name='Cron-style scheduling', provides=ITaskSchedulingMode) class CronTaskScheduler(object): """Cron-style scheduler mode""" marker_interface = ICronTask schema = ICronTaskScheduling
[docs] def get_trigger(self, task): if not self.marker_interface.providedBy(task): raise Exception(_("Task is not configured for cron-style scheduling!")) info = self.schema(task) return CronTrigger(year=info.year or '*', month=info.month or '*', day=info.day or '*', week=info.week or '*', day_of_week=info.day_of_week or '*', hour=info.hour or '*', minute=info.minute or '*', second=info.second or '0', start_date=tztime(date_to_datetime(info.start_date)), end_date=tztime(date_to_datetime(info.end_date)))
# # Date-style scheduling mode #
[docs]@factory_config(IDateTaskScheduling) class DateTaskScheduleInfo(Persistent): """Date-style schedule info""" active = FieldProperty(IDateTaskScheduling['active']) start_date = FieldProperty(IDateTaskScheduling['start_date'])
[docs]@adapter_config(context=IDateTask, provides=IDateTaskScheduling) def date_task_scheduler_info_factory(context): """Date-style task scheduling info factory""" return get_annotation_adapter(context, SCHEDULER_TASK_DATE_INFO, IDateTaskScheduling, notify=False, locate=False)
[docs]@utility_config(name='Date-style scheduling', provides=ITaskSchedulingMode) class DateTaskScheduler(object): """Date-style scheduler mode""" marker_interface = IDateTask schema = IDateTaskScheduling
[docs] def get_trigger(self, task): if not self.marker_interface.providedBy(task): raise Exception(_("Task is not configured for date-style scheduling!")) info = self.schema(task) return DateTrigger(run_date=tztime(date_to_datetime(info.start_date)))
# # Loop-style scheduling mode #
[docs]@factory_config(ILoopTaskScheduling) class LoopTaskScheduleInfo(Persistent): """Loop-style schedule info""" active = FieldProperty(ILoopTaskScheduling['active']) start_date = FieldProperty(ILoopTaskScheduling['start_date']) end_date = FieldProperty(ILoopTaskScheduling['end_date']) weeks = FieldProperty(ILoopTaskScheduling['weeks']) days = FieldProperty(ILoopTaskScheduling['days']) hours = FieldProperty(ILoopTaskScheduling['hours']) minutes = FieldProperty(ILoopTaskScheduling['minutes']) seconds = FieldProperty(ILoopTaskScheduling['seconds'])
[docs]@adapter_config(context=ILoopTask, provides=ILoopTaskScheduling) def loop_task_scheduler_info_factory(context): """Loop-style task scheduling info factory""" return get_annotation_adapter(context, SCHEDULER_TASK_LOOP_INFO, ILoopTaskScheduling, notify=False, locate=False)
[docs]@utility_config(name='Loop-style scheduling', provides=ITaskSchedulingMode) class LoopTaskScheduler(object): """Loop-style scheduler mode""" marker_interface = ILoopTask schema = ILoopTaskScheduling
[docs] def get_trigger(self, task): if not self.marker_interface.providedBy(task): raise Exception(_("Task is not configured for loop-style scheduling!")) info = self.schema(task) return IntervalTrigger(weeks=info.weeks, days=info.days, hours=info.hours, minutes=info.minutes, seconds=info.seconds, start_date=tztime(date_to_datetime(info.start_date)), end_date=tztime(date_to_datetime(info.end_date)))