diff --git a/crontab/_crontab.py b/crontab/_crontab.py index 29e9286..5266fad 100644 --- a/crontab/_crontab.py +++ b/crontab/_crontab.py @@ -89,6 +89,11 @@ _increments = [ _month_incr, lambda *a: DAY, _year_incr, + lambda dt,x: dt.replace(minute=0), + lambda dt,x: dt.replace(hour=0), + lambda dt,x: dt.replace(day=1) if x > DAY else dt, + lambda dt,x: dt.replace(month=1) if x > DAY else dt, + lambda dt,x: dt, ] # find the previously scheduled time @@ -116,6 +121,14 @@ def _year_decr(dt, m): return -(YEAR + DAY) return -YEAR +def _day_decr_reset(dt, x): + if x >= -DAY: + return dt + cur = dt.month + while dt.month == cur: + dt += DAY + return dt - DAY + _decrements = [ lambda *a: -MINUTE, lambda *a: -HOUR, @@ -123,6 +136,11 @@ _decrements = [ _month_decr, lambda *a: -DAY, _year_decr, + lambda dt,x: dt.replace(minute=59), + lambda dt,x: dt.replace(hour=23), + _day_decr_reset, + lambda dt,x: dt.replace(month=12)if x < -DAY else dt, + lambda dt,x: dt, ] Matcher = namedtuple('Matcher', 'minute, hour, day, month, weekday, year') @@ -287,7 +305,7 @@ class CronTab(object): attr = attr() % 7 return self.matchers[index](attr, dt) - def next(self, now=None, increments=_increments): + def next(self, now=None, increments=_increments, delta=True): ''' How long to wait in seconds before this crontab entry can next be executed. @@ -306,17 +324,20 @@ class CronTab(object): to_test = 0 while to_test < 6: incr = increments[to_test] + ch = False + inc = None while not self._test_match(to_test, future): - future += incr(future, self.matchers) + inc = incr(future, self.matchers) + future += inc + ch = True if _test(): return None - # check for backtrack conditions - if to_test >= 3: - for tt in xrange(2, to_test): - if not self._test_match(tt, future): - # rely on the increment below to get us back to 2 - to_test = 1 - break + if ch: + for i in xrange(0, to_test-1): + future = increments[6+i](future, inc) + to_test = 0 + continue + to_test += 1 # verify the match @@ -327,7 +348,17 @@ class CronTab(object): "crontab: %r\n" \ "now: %r", ' '.join(m.input for m in self.matchers), now) delay = future - now + if not delta: + delay = future - datetime.datetime(1970, 1, 1) return delay.days * 86400 + delay.seconds + delay.microseconds / 1000000. - def previous(self, now=None): - return self.next(now, _decrements) + def previous(self, now=None, delta=True): + return self.next(now, _decrements, delta) + + def test(self, entry): + if isinstance(entry, _number_types): + entry = datetime.datetime.utcfromtimestamp(entry) + for index in xrange(6): + if not self._test_match(index, entry): + return False + return True diff --git a/setup.py b/setup.py index d037796..4fd7506 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open('README') as f: setup( name='crontab', - version='.15', + version='.16', description='Parse and use crontab schedules in Python', author='Josiah Carlson', author_email='josiah.carlson@gmail.com', diff --git a/tests/test_crontab.py b/tests/test_crontab.py index 887e2aa..24ed5a5 100644 --- a/tests/test_crontab.py +++ b/tests/test_crontab.py @@ -25,6 +25,26 @@ class TestCrontab(unittest.TestCase): delay = ct.next(now) assert delay is None, (crontab, delay, now, now+datetime.timedelta(seconds=delay)) + def test_closest(self): + ce = CronTab("*/15 10-15 * * 1-5") + t945 = datetime.datetime(2013, 1, 1, 9, 45) # tuesday + t1245 = datetime.datetime(2013, 1, 1, 12, 45) # tuesday + s1245 = datetime.datetime(2013, 1, 5, 12, 45) # saturday + + assert not ce.test(t945) + assert not ce.test(s1245) + assert ce.test(t1245) + + n = datetime.datetime.utcfromtimestamp(ce.next(t945, delta=False)) + assert n == datetime.datetime(2013, 1, 1, 10, 0), n + p = datetime.datetime.utcfromtimestamp(ce.previous(t945, delta=False)) + assert p == datetime.datetime(2012, 12, 31, 15, 30) + + n = datetime.datetime.utcfromtimestamp(ce.next(s1245, delta=False)) + assert n == datetime.datetime(2013, 1, 7, 10, 0) + p = datetime.datetime.utcfromtimestamp(ce.previous(s1245, delta=False)) + assert p == datetime.datetime(2013, 1, 4, 15, 45) + def test_normal(self): self._run_test('* * * * *', 60) self._run_test('0 * * * *', 3600)