Fixes items like "last friday of the month"

* Bug report thanks to Tom Bech via email
This commit is contained in:
Josiah Carlson 2016-02-11 22:40:32 -08:00
parent df297e2f21
commit b7c2b5b975
3 changed files with 51 additions and 14 deletions

View file

@ -162,6 +162,7 @@ class _Matcher(object):
self.allowed = set() self.allowed = set()
self.end = None self.end = None
self.any = '*' in self.split or '?' in self.split self.any = '*' in self.split or '?' in self.split
for it in self.split: for it in self.split:
al, en = self._parse_crontab(which, it) al, en = self._parse_crontab(which, it)
if al is not None: if al is not None:
@ -170,23 +171,45 @@ class _Matcher(object):
_assert(self.end is not None, _assert(self.end is not None,
"improper item specification: %r", entry.lower() "improper item specification: %r", entry.lower()
) )
def __call__(self, v, dt): def __call__(self, v, dt):
if 'l' in self.split: for i, x in enumerate(self.split):
if x == 'l':
if v == _end_of_month(dt).day: if v == _end_of_month(dt).day:
return True return True
elif any(x.startswith('l') for x in self.split):
okay = dt.month != (dt + WEEK).month elif x.startswith('l'):
if okay and (self.any or v in self.allowed): # We have to do this in here, otherwise we can end up, for
# example, accepting *any* Friday instead of the *last* Friday.
if dt.month == (dt + WEEK).month:
continue
x = x[1:]
if x.isdigit():
x = int(x) if x != '7' else 0
if v == x:
return True return True
continue
start, end = map(int, x.partition('-')[::2])
allowed = set(range(start, end+1))
if 7 in allowed:
allowed.add(0)
if v in allowed:
return True
return self.any or v in self.allowed return self.any or v in self.allowed
def __lt__(self, other): def __lt__(self, other):
if self.any: if self.any:
return self.end < other return self.end < other
return all(item < other for item in self.allowed) return all(item < other for item in self.allowed)
def __gt__(self, other): def __gt__(self, other):
if self.any: if self.any:
return _ranges[self.which][0] > other return _ranges[self.which][0] > other
return all(item > other for item in self.allowed) return all(item > other for item in self.allowed)
def _parse_crontab(self, which, entry): def _parse_crontab(self, which, entry):
''' '''
This parses a single crontab field and returns the data necessary for This parses a single crontab field and returns the data necessary for
@ -194,6 +217,7 @@ class _Matcher(object):
See the README for information about what is accepted. See the README for information about what is accepted.
''' '''
# this handles day of week/month abbreviations # this handles day of week/month abbreviations
def _fix(it): def _fix(it):
if which in _alternate and not it.isdigit(): if which in _alternate and not it.isdigit():
@ -222,6 +246,7 @@ class _Matcher(object):
end = _end end = _end
if increment is None: if increment is None:
return set([start]) return set([start])
_assert(_start <= start <= _end_limit, _assert(_start <= start <= _end_limit,
"range start value %r out of range [%r, %r]", "range start value %r out of range [%r, %r]",
start, _start, _end_limit) start, _start, _end_limit)
@ -247,11 +272,15 @@ class _Matcher(object):
"you can only specify a bare 'L' in the 'day' field") "you can only specify a bare 'L' in the 'day' field")
return None, _end return None, _end
# last day of the week # for the last 'friday' of the month, for example
elif entry.startswith('l'): elif entry.startswith('l'):
_assert(which == 4, _assert(which == 4,
"you can only specify a leading 'L' in the 'weekday' field") "you can only specify a leading 'L' in the 'weekday' field")
entry = entry.lstrip('l') es, _, ee = entry[1:].partition('-')
_assert((entry[1:].isdigit() and 0 <= int(es) <= 7) or
(_ and es.isdigit() and ee.isdigit() and 0 <= int(es) <= 7 and 0 <= int(ee) <= 7),
"last <day> specifier must include a day number or range in the 'weekday' field, you entered %r", entry)
return None, _end
increment = None increment = None
# increments # increments
@ -266,10 +295,8 @@ class _Matcher(object):
if which == 4: if which == 4:
_end_limit = 7 _end_limit = 7
# handle all of the a,b,c and x-y,a,b entries # handle singles and ranges
good = set() good = _parse_piece(entry)
for it in entry.split(','):
good.update(_parse_piece(it))
# change Sunday to weekday 0 # change Sunday to weekday 0
if which == 4 and 7 in good: if which == 4 and 7 in good:
@ -290,10 +317,12 @@ class CronTab(object):
crontab = _aliases.get(crontab, crontab) crontab = _aliases.get(crontab, crontab)
matchers = [_Matcher(which, entry) matchers = [_Matcher(which, entry)
for which, entry in enumerate(crontab.split())] for which, entry in enumerate(crontab.split())]
if len(matchers) == 5: if len(matchers) == 5:
matchers.append(_Matcher(5, '*')) matchers.append(_Matcher(5, '*'))
_assert(len(matchers) == 6, _assert(len(matchers) == 6,
"improper number of cron entries specified") "improper number of cron entries specified")
matchers = Matcher(*matchers) matchers = Matcher(*matchers)
if not matchers.day.any: if not matchers.day.any:
_assert(matchers.weekday.any, _assert(matchers.weekday.any,

View file

@ -10,7 +10,7 @@ except:
setup( setup(
name='crontab', name='crontab',
version='0.20.4', version='0.20.5',
description='Parse and use crontab schedules in Python', description='Parse and use crontab schedules in Python',
author='Josiah Carlson', author='Josiah Carlson',
author_email='josiah.carlson@gmail.com', author_email='josiah.carlson@gmail.com',

View file

@ -113,6 +113,14 @@ class TestCrontab(unittest.TestCase):
self._run_test('0 0 ? 7 L0-1', 86400, datetime.datetime(2011, 7, 24)) self._run_test('0 0 ? 7 L0-1', 86400, datetime.datetime(2011, 7, 24))
self._run_test('0 0 ? 7 L0-1', 6*86400, datetime.datetime(2011, 7, 25)) self._run_test('0 0 ? 7 L0-1', 6*86400, datetime.datetime(2011, 7, 25))
self._run_test('59 23 L 12 *', 282*86400, datetime.datetime(2012, 3, 25), 280*84400) self._run_test('59 23 L 12 *', 282*86400, datetime.datetime(2012, 3, 25), 280*84400)
self._run_test('0 0 ? 2 L1', 28*86400, datetime.datetime(2016, 2, 1), 28*86400)
self._run_test('0 0 ? 2 L0', 27*86400, datetime.datetime(2016, 2, 1), 27*86400)
self._run_test('0 0 ? 2 L7', 27*86400, datetime.datetime(2016, 2, 1), 27*86400)
self._run_test('0 0 ? 2 L6', 26*86400, datetime.datetime(2016, 2, 1), 26*86400)
self._run_test('0 0 ? 2 L5', 25*86400, datetime.datetime(2016, 2, 1), 25*86400)
self._run_test('0 0 ? 2 L4', 24*86400, datetime.datetime(2016, 2, 1), 24*86400)
self._run_test('0 0 ? 2 L3', 23*86400, datetime.datetime(2016, 2, 1), 23*86400)
self._run_test('0 0 ? 2 L2', 22*86400, datetime.datetime(2016, 2, 1), 22*86400)
def test_impossible(self): def test_impossible(self):
self._run_impossible('0 0 * 7 fri 2011', datetime.datetime(2011, 7, 31)) self._run_impossible('0 0 * 7 fri 2011', datetime.datetime(2011, 7, 31))