Fixes a bug and add support for returning the time

* When calculating the next() or previous() valid entry, the module wouldn't "backtrack" enough, and might miss a sooner crontab entry than was found. This has been fixed.
* Added the ability to return the timestamp of the future event, instead of just the delta (pass delta=False to .next() or .previous())
* Added a method to test a given timestamp or datetime against a Crontab entry
* Special thanks to Manish Dubey github.com/mdubey for the pull request that spawned these features, provided new tests, and prompted the discovery of the bug
This commit is contained in:
Josiah Carlson 2013-02-17 17:03:52 -08:00
parent 4181061e84
commit 662bd8a947
3 changed files with 63 additions and 12 deletions

View file

@ -89,6 +89,11 @@ _increments = [
_month_incr,
lambda *a: DAY,
_year_incr,
lambda dt,x: dt.replace(minute=0),
lambda dt,x: dt.replace(hour=0),
lambda dt,x: dt.replace(day=1) if x > DAY else dt,
lambda dt,x: dt.replace(month=1) if x > DAY else dt,
lambda dt,x: dt,
]
# find the previously scheduled time
@ -116,6 +121,14 @@ def _year_decr(dt, m):
return -(YEAR + DAY)
return -YEAR
def _day_decr_reset(dt, x):
if x >= -DAY:
return dt
cur = dt.month
while dt.month == cur:
dt += DAY
return dt - DAY
_decrements = [
lambda *a: -MINUTE,
lambda *a: -HOUR,
@ -123,6 +136,11 @@ _decrements = [
_month_decr,
lambda *a: -DAY,
_year_decr,
lambda dt,x: dt.replace(minute=59),
lambda dt,x: dt.replace(hour=23),
_day_decr_reset,
lambda dt,x: dt.replace(month=12)if x < -DAY else dt,
lambda dt,x: dt,
]
Matcher = namedtuple('Matcher', 'minute, hour, day, month, weekday, year')
@ -287,7 +305,7 @@ class CronTab(object):
attr = attr() % 7
return self.matchers[index](attr, dt)
def next(self, now=None, increments=_increments):
def next(self, now=None, increments=_increments, delta=True):
'''
How long to wait in seconds before this crontab entry can next be
executed.
@ -306,17 +324,20 @@ class CronTab(object):
to_test = 0
while to_test < 6:
incr = increments[to_test]
ch = False
inc = None
while not self._test_match(to_test, future):
future += incr(future, self.matchers)
inc = incr(future, self.matchers)
future += inc
ch = True
if _test():
return None
# check for backtrack conditions
if to_test >= 3:
for tt in xrange(2, to_test):
if not self._test_match(tt, future):
# rely on the increment below to get us back to 2
to_test = 1
break
if ch:
for i in xrange(0, to_test-1):
future = increments[6+i](future, inc)
to_test = 0
continue
to_test += 1
# verify the match
@ -327,7 +348,17 @@ class CronTab(object):
"crontab: %r\n" \
"now: %r", ' '.join(m.input for m in self.matchers), now)
delay = future - now
if not delta:
delay = future - datetime.datetime(1970, 1, 1)
return delay.days * 86400 + delay.seconds + delay.microseconds / 1000000.
def previous(self, now=None):
return self.next(now, _decrements)
def previous(self, now=None, delta=True):
return self.next(now, _decrements, delta)
def test(self, entry):
if isinstance(entry, _number_types):
entry = datetime.datetime.utcfromtimestamp(entry)
for index in xrange(6):
if not self._test_match(index, entry):
return False
return True

View file

@ -7,7 +7,7 @@ with open('README') as f:
setup(
name='crontab',
version='.15',
version='.16',
description='Parse and use crontab schedules in Python',
author='Josiah Carlson',
author_email='josiah.carlson@gmail.com',

View file

@ -25,6 +25,26 @@ class TestCrontab(unittest.TestCase):
delay = ct.next(now)
assert delay is None, (crontab, delay, now, now+datetime.timedelta(seconds=delay))
def test_closest(self):
ce = CronTab("*/15 10-15 * * 1-5")
t945 = datetime.datetime(2013, 1, 1, 9, 45) # tuesday
t1245 = datetime.datetime(2013, 1, 1, 12, 45) # tuesday
s1245 = datetime.datetime(2013, 1, 5, 12, 45) # saturday
assert not ce.test(t945)
assert not ce.test(s1245)
assert ce.test(t1245)
n = datetime.datetime.utcfromtimestamp(ce.next(t945, delta=False))
assert n == datetime.datetime(2013, 1, 1, 10, 0), n
p = datetime.datetime.utcfromtimestamp(ce.previous(t945, delta=False))
assert p == datetime.datetime(2012, 12, 31, 15, 30)
n = datetime.datetime.utcfromtimestamp(ce.next(s1245, delta=False))
assert n == datetime.datetime(2013, 1, 7, 10, 0)
p = datetime.datetime.utcfromtimestamp(ce.previous(s1245, delta=False))
assert p == datetime.datetime(2013, 1, 4, 15, 45)
def test_normal(self):
self._run_test('* * * * *', 60)
self._run_test('0 * * * *', 3600)