You can now find a previously scheduled time.

This commit is contained in:
Josiah Carlson 2012-01-29 11:37:18 -08:00
parent 1ec538ff67
commit 74914494a0
3 changed files with 60 additions and 6 deletions

View file

@ -50,6 +50,7 @@ WEEK = datetime.timedelta(days=7)
MONTH = datetime.timedelta(days=28)
YEAR = datetime.timedelta(days=365)
# find the next scheduled time
def _day_incr(dt, m):
if m.day.input != 'l':
return DAY
@ -86,6 +87,40 @@ _increments = [
_year_incr,
]
# find the previously scheduled time
def _day_decr(dt, m):
if m.day.input != 'l':
return -DAY
odt = dt
ndt = dt = dt - DAY
while dt.month == ndt.month:
dt -= DAY
return dt - odt
def _month_decr(dt, m):
odt = dt
# get to the last day of last month, let the backtracking handle it
dt = dt.replace(day=1) - DAY
return dt - odt
def _year_decr(dt, m):
# simple leapyear stuff works for 1970-2099 :)
mod = dt.year % 4
if mod == 0 and (dt.month, dt.day) > (2, 29):
return -(YEAR + DAY)
if mod == 1 and (dt.month, dt.day) < (2, 29):
return -(YEAR + DAY)
return -YEAR
_decrements = [
lambda *a: -MINUTE,
lambda *a: -HOUR,
_day_decr,
_month_decr,
lambda *a: -DAY,
_year_decr,
]
Matcher = namedtuple('Matcher', 'minute, hour, day, month, weekday, year')
def _assert(condition, message, *args):
@ -112,6 +147,10 @@ class _Matcher(object):
if self.any:
return self.end < other
return all(item < other for item in self.allowed)
def __gt__(self, other):
if self.any:
return _ranges[self.which][0] > other
return all(item > other for item in self.allowed)
def _parse_crontab(self, which, entry):
'''
This parses a single crontab field and returns the data necessary for
@ -220,7 +259,7 @@ class CronTab(object):
attr = attr() % 7
return self.matchers[index](attr, dt)
def next(self, now=None):
def next(self, now=None, increments=_increments):
'''
How long to wait in seconds before this crontab entry can next be
executed.
@ -228,14 +267,20 @@ class CronTab(object):
now = now or datetime.datetime.now()
if isinstance(now, (int, long, float)):
now = datetime.datetime.fromtimestamp(now)
# get a reasonable future start time
future = now.replace(second=0, microsecond=0) + datetime.timedelta(minutes=1)
# get a reasonable future/past start time
future = now.replace(second=0, microsecond=0) + increments[0]()
if future < now:
# we are going backwards...
_test = lambda: future.year < self.matchers.year
else:
# we are going forwards
_test = lambda: self.matchers.year < future.year
to_test = 0
while to_test < 6:
incr = _increments[to_test]
incr = increments[to_test]
while not self._test_match(to_test, future):
future += incr(future, self.matchers)
if self.matchers.year < future.year:
if _test():
return None
# check for backtrack conditions
if to_test >= 3:
@ -255,3 +300,6 @@ class CronTab(object):
"now: %r", ' '.join(m.input for m in self.matchers), now)
delay = future - now
return delay.days * 86400 + delay.seconds + delay.microseconds / 1000000.
def previous(self, now=None):
return self.next(now, _decrements)

View file

@ -7,7 +7,7 @@ with open('README') as f:
setup(
name='crontab',
version='.11',
version='.12',
description='Parse and use crontab schedules in Python',
author='Josiah Carlson',
author_email='josiah.carlson@gmail.com',

View file

@ -11,6 +11,12 @@ class TestCrontab(unittest.TestCase):
assert delay is not None
dd = (crontab, delay, max_delay, now, now+datetime.timedelta(seconds=delay))
assert delay <= max_delay, dd
if not crontab.endswith(' 2099'):
delay2 = ct.previous(now + datetime.timedelta(seconds=delay))
dd = (crontab, delay, max_delay, now, now+datetime.timedelta(seconds=delay))
assert abs(delay2) >= delay, (delay, delay2)
pt = now + datetime.timedelta(seconds=delay) + datetime.timedelta(seconds=delay2)
assert pt <= now, dd
def _run_impossible(self, crontab, now):
ct = CronTab(crontab)