897 lines
33 KiB
Python
897 lines
33 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Unit tests for gluon.scheduler
|
|
"""
|
|
import os
|
|
import unittest
|
|
import glob
|
|
import datetime
|
|
import sys
|
|
|
|
from gluon.storage import Storage
|
|
from gluon.languages import translator
|
|
from gluon.scheduler import JobGraph, Scheduler, CronParser
|
|
from gluon.dal import DAL
|
|
|
|
|
|
class BaseTestScheduler(unittest.TestCase):
|
|
def setUp(self):
|
|
self.db = None
|
|
self.cleanfolder()
|
|
from gluon.globals import current
|
|
s = Storage({'application': 'welcome',
|
|
'folder': 'applications/welcome',
|
|
'controller': 'default'})
|
|
current.request = s
|
|
T = translator('', 'en')
|
|
current.T = T
|
|
self.db = DAL('sqlite://dummy2.db', check_reserved=['all'])
|
|
|
|
def cleanfolder(self):
|
|
if self.db:
|
|
self.db.close()
|
|
try:
|
|
os.unlink('dummy2.db')
|
|
except:
|
|
pass
|
|
tfiles = glob.glob('*_scheduler*.table')
|
|
for a in tfiles:
|
|
os.unlink(a)
|
|
|
|
def tearDown(self):
|
|
self.cleanfolder()
|
|
try:
|
|
self.inner_teardown()
|
|
except:
|
|
pass
|
|
|
|
|
|
class CronParserTest(unittest.TestCase):
|
|
|
|
def testMinute(self):
|
|
# minute asterisk
|
|
base = datetime.datetime(2010, 1, 23, 12, 18)
|
|
itr = CronParser('*/1 * * * *', base)
|
|
n1 = itr.get_next() # 19
|
|
self.assertEqual(base.year, n1.year)
|
|
self.assertEqual(base.month, n1.month)
|
|
self.assertEqual(base.day, n1.day)
|
|
self.assertEqual(base.hour, n1.hour)
|
|
self.assertEqual(base.minute, n1.minute - 1)
|
|
for i in range(39): # ~ 58
|
|
itr.get_next()
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.minute, 59)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.minute, 0)
|
|
self.assertEqual(n3.hour, 13)
|
|
|
|
itr = CronParser('*/5 * * * *', base)
|
|
n4 = itr.get_next()
|
|
self.assertEqual(n4.minute, 20)
|
|
for i in range(6):
|
|
itr.get_next()
|
|
n5 = itr.get_next()
|
|
self.assertEqual(n5.minute, 55)
|
|
n6 = itr.get_next()
|
|
self.assertEqual(n6.minute, 0)
|
|
self.assertEqual(n6.hour, 13)
|
|
|
|
base = datetime.datetime(2010, 1, 23, 12, 18)
|
|
itr = CronParser('4/34 * * * *', base)
|
|
n7 = itr.get_next()
|
|
self.assertEqual(n7.minute, 38)
|
|
self.assertEqual(n7.hour, 12)
|
|
n8 = itr.get_next()
|
|
self.assertEqual(n8.minute, 4)
|
|
self.assertEqual(n8.hour, 13)
|
|
|
|
def testHour(self):
|
|
base = datetime.datetime(2010, 1, 24, 12, 2)
|
|
itr = CronParser('0 */3 * * *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.hour, 15)
|
|
self.assertEqual(n1.minute, 0)
|
|
for i in range(2):
|
|
itr.get_next()
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.hour, 0)
|
|
self.assertEqual(n2.day, 25)
|
|
|
|
def testDay(self):
|
|
base = datetime.datetime(2010, 2, 24, 12, 9)
|
|
itr = CronParser('0 0 */3 * *', base)
|
|
n1 = itr.get_next()
|
|
# 1 4 7 10 13 16 19 22 25 28
|
|
self.assertEqual(n1.day, 25)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.day, 28)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.day, 1)
|
|
self.assertEqual(n3.month, 3)
|
|
|
|
# test leap year
|
|
base = datetime.datetime(1996, 2, 27)
|
|
itr = CronParser('0 0 * * *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 28)
|
|
self.assertEqual(n1.month, 2)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.day, 29)
|
|
self.assertEqual(n2.month, 2)
|
|
|
|
base2 = datetime.datetime(2000, 2, 27)
|
|
itr2 = CronParser('0 0 * * *', base2)
|
|
n3 = itr2.get_next()
|
|
self.assertEqual(n3.day, 28)
|
|
self.assertEqual(n3.month, 2)
|
|
n4 = itr2.get_next()
|
|
self.assertEqual(n4.day, 29)
|
|
self.assertEqual(n4.month, 2)
|
|
|
|
def testWeekDay(self):
|
|
base = datetime.datetime(2010, 2, 25)
|
|
itr = CronParser('0 0 * * sat', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.isoweekday(), 6)
|
|
self.assertEqual(n1.day, 27)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.isoweekday(), 6)
|
|
self.assertEqual(n2.day, 6)
|
|
self.assertEqual(n2.month, 3)
|
|
|
|
base = datetime.datetime(2010, 1, 25)
|
|
itr = CronParser('0 0 1 * wed', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.month, 1)
|
|
self.assertEqual(n1.day, 27)
|
|
self.assertEqual(n1.year, 2010)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.month, 2)
|
|
self.assertEqual(n2.day, 1)
|
|
self.assertEqual(n2.year, 2010)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.month, 2)
|
|
self.assertEqual(n3.day, 3)
|
|
self.assertEqual(n3.year, 2010)
|
|
|
|
def testMonth(self):
|
|
base = datetime.datetime(2010, 1, 25)
|
|
itr = CronParser('0 0 1 * *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.month, 2)
|
|
self.assertEqual(n1.day, 1)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.month, 3)
|
|
self.assertEqual(n2.day, 1)
|
|
for i in range(8):
|
|
itr.get_next()
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.month, 12)
|
|
self.assertEqual(n3.year, 2010)
|
|
n4 = itr.get_next()
|
|
self.assertEqual(n4.month, 1)
|
|
self.assertEqual(n4.year, 2011)
|
|
|
|
base = datetime.datetime(2010, 1, 25)
|
|
itr = CronParser('0 0 1 */4 *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.month, 5)
|
|
self.assertEqual(n1.day, 1)
|
|
|
|
base = datetime.datetime(2010, 1, 25)
|
|
itr = CronParser('0 0 1 1-3 *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.month, 2)
|
|
self.assertEqual(n1.day, 1)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.month, 3)
|
|
self.assertEqual(n2.day, 1)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.month, 1)
|
|
self.assertEqual(n3.day, 1)
|
|
|
|
def testSundayToThursdayWithAlphaConversion(self):
|
|
base = datetime.datetime(2010, 8, 25, 15, 56)
|
|
itr = CronParser("30 22 * * sun-thu", base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(base.year, n1.year)
|
|
self.assertEqual(base.month, n1.month)
|
|
self.assertEqual(base.day, n1.day)
|
|
self.assertEqual(22, n1.hour)
|
|
self.assertEqual(30, n1.minute)
|
|
|
|
def testISOWeekday(self):
|
|
base = datetime.datetime(2010, 2, 25)
|
|
itr = CronParser('0 0 * * 7', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.isoweekday(), 7)
|
|
self.assertEqual(n1.day, 28)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.isoweekday(), 7)
|
|
self.assertEqual(n2.day, 7)
|
|
self.assertEqual(n2.month, 3)
|
|
base = datetime.datetime(2010, 2, 22)
|
|
itr = CronParser('0 0 * * */2', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.isoweekday(), 2)
|
|
self.assertEqual(n1.day, 23)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.isoweekday(), 4)
|
|
self.assertEqual(n2.day, 25)
|
|
|
|
def testBug2(self):
|
|
|
|
base = datetime.datetime(2012, 1, 1, 0, 0)
|
|
itr = CronParser('0 * * 3 *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.month, 3)
|
|
self.assertEqual(n1.day, base.day)
|
|
self.assertEqual(n1.hour, base.hour)
|
|
self.assertEqual(n1.minute, base.minute)
|
|
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.year, base.year)
|
|
self.assertEqual(n2.month, 3)
|
|
self.assertEqual(n2.day, base.day)
|
|
self.assertEqual(n2.hour, base.hour + 1)
|
|
self.assertEqual(n2.minute, base.minute)
|
|
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.year, base.year)
|
|
self.assertEqual(n3.month, 3)
|
|
self.assertEqual(n3.day, base.day)
|
|
self.assertEqual(n3.hour, base.hour + 2)
|
|
self.assertEqual(n3.minute, base.minute)
|
|
|
|
def testBug3(self):
|
|
base = datetime.datetime(2013, 3, 1, 12, 17, 34, 257877)
|
|
c = CronParser('00 03 16,30 * *', base)
|
|
|
|
n1 = c.get_next()
|
|
self.assertEqual(n1.month, 3)
|
|
self.assertEqual(n1.day, 16)
|
|
|
|
n2 = c.get_next()
|
|
self.assertEqual(n2.month, 3)
|
|
self.assertEqual(n2.day, 30)
|
|
|
|
n3 = c.get_next()
|
|
self.assertEqual(n3.month, 4)
|
|
self.assertEqual(n3.day, 16)
|
|
|
|
def test_rangeGenerator(self):
|
|
base = datetime.datetime(2013, 3, 4, 0, 0)
|
|
itr = CronParser('1-9/2 0 1 * *', base)
|
|
n1 = itr.get_next()
|
|
n2 = itr.get_next()
|
|
n3 = itr.get_next()
|
|
n4 = itr.get_next()
|
|
n5 = itr.get_next()
|
|
self.assertEqual(n1.minute, 1)
|
|
self.assertEqual(n2.minute, 3)
|
|
self.assertEqual(n3.minute, 5)
|
|
self.assertEqual(n4.minute, 7)
|
|
self.assertEqual(n5.minute, 9)
|
|
|
|
def test_iterGenerator(self):
|
|
base = datetime.datetime(2013, 3, 4, 0, 0)
|
|
itr = CronParser('1-9/2 0 1 * *', base)
|
|
x = 0
|
|
for n in itr:
|
|
x += 1
|
|
if x > 4:
|
|
break
|
|
self.assertEqual(n.minute, 9)
|
|
|
|
def test_invalidcron(self):
|
|
base = datetime.datetime(2013, 3, 4, 0, 0)
|
|
itr = CronParser('5 4 31 2 *', base)
|
|
self.assertRaises(ValueError, itr.get_next)
|
|
itr = CronParser('* * 5-1 * *', base)
|
|
self.assertRaises(ValueError, itr.get_next)
|
|
itr = CronParser('* * * janu-jun *', base)
|
|
self.assertRaises(KeyError, itr.get_next)
|
|
itr = CronParser('* * * * * *', base)
|
|
self.assertRaises(ValueError, itr.get_next)
|
|
itr = CronParser('* * * *', base)
|
|
self.assertRaises(ValueError, itr.get_next)
|
|
|
|
def testLastDayOfMonth(self):
|
|
base = datetime.datetime(2015, 9, 4)
|
|
itr = CronParser('0 0 L * *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.month, 9)
|
|
self.assertEqual(n1.day, 30)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.month, 10)
|
|
self.assertEqual(n2.day, 31)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.month, 11)
|
|
self.assertEqual(n3.day, 30)
|
|
n4 = itr.get_next()
|
|
self.assertEqual(n4.month, 12)
|
|
self.assertEqual(n4.day, 31)
|
|
|
|
base = datetime.datetime(1996, 2, 27)
|
|
itr = CronParser('0 0 L * *', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 29)
|
|
self.assertEqual(n1.month, 2)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.day, 31)
|
|
self.assertEqual(n2.month, 3)
|
|
|
|
def testSpecialExpr(self):
|
|
base = datetime.datetime(2000, 1, 1)
|
|
itr = CronParser('@yearly', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 1)
|
|
self.assertEqual(n1.month, 1)
|
|
self.assertEqual(n1.year, base.year + 1)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
itr = CronParser('@annually', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 1)
|
|
self.assertEqual(n1.month, 1)
|
|
self.assertEqual(n1.year, base.year + 1)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
itr = CronParser('@monthly', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 1)
|
|
self.assertEqual(n1.month, base.month + 1)
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
itr = CronParser('@weekly', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 2)
|
|
self.assertEqual(n1.month, base.month)
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
n2 = itr.get_next()
|
|
self.assertEqual(n2.day, 9)
|
|
self.assertEqual(n2.month, base.month)
|
|
self.assertEqual(n2.year, base.year)
|
|
self.assertEqual(n2.hour, 0)
|
|
self.assertEqual(n2.minute, 0)
|
|
n3 = itr.get_next()
|
|
self.assertEqual(n3.day, 16)
|
|
self.assertEqual(n3.month, base.month)
|
|
self.assertEqual(n3.year, base.year)
|
|
self.assertEqual(n3.hour, 0)
|
|
self.assertEqual(n3.minute, 0)
|
|
|
|
itr = CronParser('@daily', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 2)
|
|
self.assertEqual(n1.month, base.month)
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
itr = CronParser('@midnight', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 2)
|
|
self.assertEqual(n1.month, base.month)
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.hour, 0)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
itr = CronParser('@hourly', base)
|
|
n1 = itr.get_next()
|
|
self.assertEqual(n1.day, 1)
|
|
self.assertEqual(n1.month, base.month)
|
|
self.assertEqual(n1.year, base.year)
|
|
self.assertEqual(n1.hour, 1)
|
|
self.assertEqual(n1.minute, 0)
|
|
|
|
|
|
|
|
class TestsForJobGraph(BaseTestScheduler):
|
|
|
|
def testJobGraph(self):
|
|
s = Scheduler(self.db)
|
|
myjob = JobGraph(self.db, 'job_1')
|
|
fname = 'foo'
|
|
# We have a few items to wear, and there's an "order" to respect...
|
|
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
|
|
# Now, we can't put on the tie without wearing the shirt first, etc...
|
|
watch = s.queue_task(fname, task_name='watch')
|
|
jacket = s.queue_task(fname, task_name='jacket')
|
|
shirt = s.queue_task(fname, task_name='shirt')
|
|
tie = s.queue_task(fname, task_name='tie')
|
|
pants = s.queue_task(fname, task_name='pants')
|
|
undershorts = s.queue_task(fname, task_name='undershorts')
|
|
belt = s.queue_task(fname, task_name='belt')
|
|
shoes = s.queue_task(fname, task_name='shoes')
|
|
socks = s.queue_task(fname, task_name='socks')
|
|
# before the tie, comes the shirt
|
|
myjob.add_deps(tie.id, shirt.id)
|
|
# before the belt too comes the shirt
|
|
myjob.add_deps(belt.id, shirt.id)
|
|
# before the jacket, comes the tie
|
|
myjob.add_deps(jacket.id, tie.id)
|
|
# before the belt, come the pants
|
|
myjob.add_deps(belt.id, pants.id)
|
|
# before the shoes, comes the pants
|
|
myjob.add_deps(shoes.id, pants.id)
|
|
# before the pants, comes the undershorts
|
|
myjob.add_deps(pants.id, undershorts.id)
|
|
# before the shoes, comes the undershorts
|
|
myjob.add_deps(shoes.id, undershorts.id)
|
|
# before the jacket, comes the belt
|
|
myjob.add_deps(jacket.id, belt.id)
|
|
# before the shoes, comes the socks
|
|
myjob.add_deps(shoes.id, socks.id)
|
|
|
|
## results in the following topological sort
|
|
# 9,3,6 --> 4,5 --> 8,7 --> 2
|
|
# socks, shirt, undershorts
|
|
# tie, pants
|
|
# shoes, belt
|
|
# jacket
|
|
known_toposort = [
|
|
set([socks.id, shirt.id, undershorts.id]),
|
|
set([tie.id, pants.id]),
|
|
set([shoes.id, belt.id]),
|
|
set([jacket.id])
|
|
]
|
|
toposort = myjob.validate('job_1')
|
|
self.assertEqual(toposort, known_toposort)
|
|
# add a cyclic dependency, jacket to undershorts
|
|
myjob.add_deps(undershorts.id, jacket.id)
|
|
# no exceptions raised, but result None
|
|
self.assertEqual(myjob.validate('job_1'), None)
|
|
|
|
def testJobGraphFailing(self):
|
|
s = Scheduler(self.db)
|
|
myjob = JobGraph(self.db, 'job_1')
|
|
fname = 'foo'
|
|
# We have a few items to wear, and there's an "order" to respect...
|
|
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
|
|
# Now, we can't put on the tie without wearing the shirt first, etc...
|
|
watch = s.queue_task(fname, task_name='watch')
|
|
jacket = s.queue_task(fname, task_name='jacket')
|
|
shirt = s.queue_task(fname, task_name='shirt')
|
|
tie = s.queue_task(fname, task_name='tie')
|
|
pants = s.queue_task(fname, task_name='pants')
|
|
undershorts = s.queue_task(fname, task_name='undershorts')
|
|
belt = s.queue_task(fname, task_name='belt')
|
|
shoes = s.queue_task(fname, task_name='shoes')
|
|
socks = s.queue_task(fname, task_name='socks')
|
|
# before the tie, comes the shirt
|
|
myjob.add_deps(tie.id, shirt.id)
|
|
# before the belt too comes the shirt
|
|
myjob.add_deps(belt.id, shirt.id)
|
|
# before the jacket, comes the tie
|
|
myjob.add_deps(jacket.id, tie.id)
|
|
# before the belt, come the pants
|
|
myjob.add_deps(belt.id, pants.id)
|
|
# before the shoes, comes the pants
|
|
myjob.add_deps(shoes.id, pants.id)
|
|
# before the pants, comes the undershorts
|
|
myjob.add_deps(pants.id, undershorts.id)
|
|
# before the shoes, comes the undershorts
|
|
myjob.add_deps(shoes.id, undershorts.id)
|
|
# before the jacket, comes the belt
|
|
myjob.add_deps(jacket.id, belt.id)
|
|
# before the shoes, comes the socks
|
|
myjob.add_deps(shoes.id, socks.id)
|
|
# add a cyclic dependency, jacket to undershorts
|
|
myjob.add_deps(undershorts.id, jacket.id)
|
|
# no exceptions raised, but result None
|
|
self.assertEqual(myjob.validate('job_1'), None)
|
|
# and no deps added
|
|
deps_inserted = self.db(self.db.scheduler_task_deps.id>0).count()
|
|
self.assertEqual(deps_inserted, 0)
|
|
|
|
def testJobGraphDifferentJobs(self):
|
|
s = Scheduler(self.db)
|
|
myjob1 = JobGraph(self.db, 'job_1')
|
|
myjob2 = JobGraph(self.db, 'job_2')
|
|
fname = 'foo'
|
|
# We have a few items to wear, and there's an "order" to respect...
|
|
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
|
|
# Now, we can't put on the tie without wearing the shirt first, etc...
|
|
watch = s.queue_task(fname, task_name='watch')
|
|
jacket = s.queue_task(fname, task_name='jacket')
|
|
shirt = s.queue_task(fname, task_name='shirt')
|
|
tie = s.queue_task(fname, task_name='tie')
|
|
pants = s.queue_task(fname, task_name='pants')
|
|
undershorts = s.queue_task(fname, task_name='undershorts')
|
|
belt = s.queue_task(fname, task_name='belt')
|
|
shoes = s.queue_task(fname, task_name='shoes')
|
|
socks = s.queue_task(fname, task_name='socks')
|
|
# before the tie, comes the shirt
|
|
myjob1.add_deps(tie.id, shirt.id)
|
|
# before the belt too comes the shirt
|
|
myjob1.add_deps(belt.id, shirt.id)
|
|
# before the jacket, comes the tie
|
|
myjob1.add_deps(jacket.id, tie.id)
|
|
# before the belt, come the pants
|
|
myjob1.add_deps(belt.id, pants.id)
|
|
# before the shoes, comes the pants
|
|
myjob2.add_deps(shoes.id, pants.id)
|
|
# before the pants, comes the undershorts
|
|
myjob2.add_deps(pants.id, undershorts.id)
|
|
# before the shoes, comes the undershorts
|
|
myjob2.add_deps(shoes.id, undershorts.id)
|
|
# before the jacket, comes the belt
|
|
myjob2.add_deps(jacket.id, belt.id)
|
|
# before the shoes, comes the socks
|
|
myjob2.add_deps(shoes.id, socks.id)
|
|
# every job by itself can be completed
|
|
self.assertNotEqual(myjob1.validate('job_1'), None)
|
|
self.assertNotEqual(myjob1.validate('job_2'), None)
|
|
# and, implicitly, every queued task can be too
|
|
self.assertNotEqual(myjob1.validate(), None)
|
|
# add a cyclic dependency, jacket to undershorts
|
|
myjob2.add_deps(undershorts.id, jacket.id)
|
|
# every job can still be completed by itself
|
|
self.assertNotEqual(myjob1.validate('job_1'), None)
|
|
self.assertNotEqual(myjob1.validate('job_2'), None)
|
|
# but trying to see if every task will ever be completed fails
|
|
self.assertEqual(myjob2.validate(), None)
|
|
|
|
|
|
class TestsForSchedulerAPIs(BaseTestScheduler):
|
|
|
|
def testQueue_Task(self):
|
|
|
|
def isnotqueued(result):
|
|
self.assertEqual(result.id, None)
|
|
self.assertEqual(result.uuid, None)
|
|
self.assertEqual(len(result.errors.keys()) > 0, True)
|
|
|
|
def isqueued(result):
|
|
self.assertNotEqual(result.id, None)
|
|
self.assertNotEqual(result.uuid, None)
|
|
self.assertEqual(len(result.errors.keys()), 0)
|
|
|
|
s = Scheduler(self.db)
|
|
fname = 'foo'
|
|
watch = s.queue_task(fname, task_name='watch')
|
|
# queuing a task returns id, errors, uuid
|
|
self.assertEqual(set(watch.keys()), set(['id', 'uuid', 'errors']))
|
|
# queueing nothing isn't allowed
|
|
self.assertRaises(TypeError, s.queue_task, *[])
|
|
# passing pargs and pvars wrongly
|
|
# # pargs as dict
|
|
isnotqueued(s.queue_task(fname, dict(a=1), dict(b=1)))
|
|
# # pvars as list
|
|
isnotqueued(s.queue_task(fname, ['foo', 'bar'], ['foo', 'bar']))
|
|
# two tasks with the same uuid won't be there
|
|
isqueued(s.queue_task(fname, uuid='a'))
|
|
isnotqueued(s.queue_task(fname, uuid='a'))
|
|
# # #FIXME add here every parameter
|
|
|
|
def testTask_Status(self):
|
|
s = Scheduler(self.db)
|
|
fname = 'foo'
|
|
watch = s.queue_task(fname, task_name='watch')
|
|
# fetch status by id
|
|
by_id = s.task_status(watch.id)
|
|
# fetch status by uuid
|
|
by_uuid = s.task_status(watch.uuid)
|
|
# fetch status by query
|
|
by_query = s.task_status(self.db.scheduler_task.function_name == 'foo')
|
|
self.assertEqual(by_id, by_uuid)
|
|
self.assertEqual(by_id, by_query)
|
|
# fetch status by anything else throws
|
|
self.assertRaises(SyntaxError, s.task_status, *[[1, 2]])
|
|
# adding output returns the joined set, plus "result"
|
|
rtn = s.task_status(watch.id, output=True)
|
|
self.assertEqual(set(rtn.keys()), set(['scheduler_run', 'scheduler_task', 'result']))
|
|
|
|
|
|
class testForSchedulerRunnerBase(BaseTestScheduler):
|
|
|
|
def inner_teardown(self):
|
|
from gluon import current
|
|
fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
|
|
os.unlink(fdest)
|
|
additional_files = [
|
|
os.path.join(current.request.folder, 'private', 'demo8.pholder'),
|
|
os.path.join(current.request.folder, 'views', 'issue_1485_2.html'),
|
|
]
|
|
for f in additional_files:
|
|
try:
|
|
os.unlink(f)
|
|
except:
|
|
pass
|
|
|
|
def writeview(self, content, dest=None):
|
|
from gluon import current
|
|
fdest = os.path.join(current.request.folder, 'views', dest)
|
|
with open(fdest, 'w') as q:
|
|
q.write(content)
|
|
|
|
def writefunction(self, content, initlines=None):
|
|
from gluon import current
|
|
fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
|
|
if initlines is None:
|
|
initlines = """
|
|
import os
|
|
import time
|
|
from gluon.scheduler import Scheduler
|
|
db_dal = os.path.abspath(os.path.join(request.folder, '..', '..', 'dummy2.db'))
|
|
sched_dal = DAL('sqlite://%s' % db_dal, folder=os.path.dirname(db_dal))
|
|
sched = Scheduler(sched_dal, max_empty_runs=15, migrate=False, heartbeat=1)
|
|
def termination():
|
|
sched.terminate()
|
|
sched_dal.commit()
|
|
"""
|
|
with open(fdest, 'w') as q:
|
|
q.write(initlines)
|
|
q.write(content)
|
|
|
|
def exec_sched(self):
|
|
import subprocess
|
|
call_args = [sys.executable, 'web2py.py', '--no-banner', '-D', '20','-K', 'welcome']
|
|
ret = subprocess.call(call_args, env=dict(os.environ))
|
|
return ret
|
|
|
|
def fetch_results(self, sched, task):
|
|
info = sched.task_status(task.id)
|
|
task_runs = self.db(self.db.scheduler_run.task_id == task.id).select()
|
|
return info, task_runs
|
|
|
|
def exec_asserts(self, stmts, tag):
|
|
for stmt in stmts:
|
|
self.assertEqual(stmt[1], True, msg="%s - %s" % (tag, stmt[0]))
|
|
|
|
|
|
class TestsForSchedulerRunner(testForSchedulerRunnerBase):
|
|
|
|
def testRepeats_and_Expired_and_Prio(self):
|
|
s = Scheduler(self.db)
|
|
repeats = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), repeats=2, period=5)
|
|
a_while_ago = datetime.datetime.now() - datetime.timedelta(seconds=60)
|
|
expired = s.queue_task('demo4', stop_time=a_while_ago)
|
|
prio1 = s.queue_task('demo1', ['scheduled_first'])
|
|
prio2 = s.queue_task('demo1', ['scheduled_second'], next_run_time=a_while_ago)
|
|
self.db.commit()
|
|
self.writefunction(r"""
|
|
def demo1(*args,**vars):
|
|
print('you passed args=%s and vars=%s' % (args, vars))
|
|
return args[0]
|
|
|
|
def demo4():
|
|
time.sleep(15)
|
|
print("I'm printing something")
|
|
return dict(a=1, b=2)
|
|
""")
|
|
ret = self.exec_sched()
|
|
self.assertEqual(ret, 0)
|
|
# repeats check
|
|
task, task_run = self.fetch_results(s, repeats)
|
|
res = [
|
|
("task status completed", task.status == 'COMPLETED'),
|
|
("task times_run is 2", task.times_run == 2),
|
|
("task ran 2 times only", len(task_run) == 2),
|
|
("scheduler_run records are COMPLETED ", (task_run[0].status == task_run[1].status == 'COMPLETED')),
|
|
("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period)))
|
|
]
|
|
self.exec_asserts(res, 'REPEATS')
|
|
|
|
# expired check
|
|
task, task_run = self.fetch_results(s, expired)
|
|
res = [
|
|
("task status expired", task.status == 'EXPIRED'),
|
|
("task times_run is 0", task.times_run == 0),
|
|
("task didn't run at all", len(task_run) == 0)
|
|
]
|
|
self.exec_asserts(res, 'EXPIRATION')
|
|
|
|
# prio check
|
|
task1 = s.task_status(prio1.id, output=True)
|
|
task2 = s.task_status(prio2.id, output=True)
|
|
res = [
|
|
("tasks status completed", task1.scheduler_task.status == task2.scheduler_task.status == 'COMPLETED'),
|
|
("priority2 was executed before priority1" , task1.scheduler_run.id > task2.scheduler_run.id)
|
|
]
|
|
self.exec_asserts(res, 'PRIORITY')
|
|
|
|
def testNoReturn_and_Timeout_and_Progress(self):
|
|
s = Scheduler(self.db)
|
|
noret1 = s.queue_task('demo5')
|
|
noret2 = s.queue_task('demo3')
|
|
timeout1 = s.queue_task('demo4', timeout=5)
|
|
timeout2 = s.queue_task('demo4')
|
|
progress = s.queue_task('demo6', sync_output=2)
|
|
termination = s.queue_task('termination')
|
|
self.db.commit()
|
|
self.writefunction(r"""
|
|
def demo3():
|
|
time.sleep(3)
|
|
print(1/0)
|
|
return None
|
|
|
|
def demo4():
|
|
time.sleep(15)
|
|
print("I'm printing something")
|
|
return dict(a=1, b=2)
|
|
|
|
def demo5():
|
|
time.sleep(3)
|
|
print("I'm printing something")
|
|
rtn = dict(a=1, b=2)
|
|
|
|
def demo6():
|
|
time.sleep(5)
|
|
print('50%')
|
|
time.sleep(5)
|
|
print('!clear!100%')
|
|
return 1
|
|
""")
|
|
ret = self.exec_sched()
|
|
self.assertEqual(ret, 0)
|
|
# noreturn check
|
|
task1, task_run1 = self.fetch_results(s, noret1)
|
|
task2, task_run2 = self.fetch_results(s, noret2)
|
|
res = [
|
|
("tasks no_returns1 completed", task1.status == 'COMPLETED'),
|
|
("tasks no_returns2 failed", task2.status == 'FAILED'),
|
|
("no_returns1 doesn't have a scheduler_run record", len(task_run1) == 0),
|
|
("no_returns2 has a scheduler_run record FAILED", (len(task_run2) == 1 and task_run2[0].status == 'FAILED')),
|
|
]
|
|
self.exec_asserts(res, 'NO_RETURN')
|
|
|
|
# timeout check
|
|
task1 = s.task_status(timeout1.id, output=True)
|
|
task2 = s.task_status(timeout2.id, output=True)
|
|
res = [
|
|
("tasks timeouts1 timeoutted", task1.scheduler_task.status == 'TIMEOUT'),
|
|
("tasks timeouts2 completed", task2.scheduler_task.status == 'COMPLETED')
|
|
]
|
|
self.exec_asserts(res, 'TIMEOUT')
|
|
|
|
# progress check
|
|
task1 = s.task_status(progress.id, output=True)
|
|
res = [
|
|
("tasks percentages completed", task1.scheduler_task.status == 'COMPLETED'),
|
|
("output contains only 100%", task1.scheduler_run.run_output.strip() == "100%")
|
|
]
|
|
self.exec_asserts(res, 'PROGRESS')
|
|
|
|
def testDrift_and_env_and_immediate(self):
|
|
s = Scheduler(self.db)
|
|
immediate = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), immediate=True)
|
|
env = s.queue_task('demo7')
|
|
drift = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), period=93, prevent_drift=True)
|
|
termination = s.queue_task('termination')
|
|
self.db.commit()
|
|
self.writefunction(r"""
|
|
def demo1(*args,**vars):
|
|
print('you passed args=%s and vars=%s' % (args, vars))
|
|
return args[0]
|
|
import random
|
|
def demo7():
|
|
time.sleep(random.randint(1,5))
|
|
print(W2P_TASK, request.now)
|
|
return W2P_TASK.id, W2P_TASK.uuid, W2P_TASK.run_id
|
|
""")
|
|
ret = self.exec_sched()
|
|
self.assertEqual(ret, 0)
|
|
# immediate check, can only check that nothing breaks
|
|
task1 = s.task_status(immediate.id)
|
|
res = [
|
|
("tasks status completed", task1.status == 'COMPLETED'),
|
|
]
|
|
self.exec_asserts(res, 'IMMEDIATE')
|
|
|
|
# drift check
|
|
task, task_run = self.fetch_results(s, drift)
|
|
res = [
|
|
("task status completed", task.status == 'COMPLETED'),
|
|
("next_run_time is exactly start_time + period", (task.next_run_time == task.start_time + datetime.timedelta(seconds=task.period)))
|
|
]
|
|
self.exec_asserts(res, 'DRIFT')
|
|
|
|
# env check
|
|
task1 = s.task_status(env.id, output=True)
|
|
res = [
|
|
("task %s returned W2P_TASK correctly" % (task1.scheduler_task.id), task1.result == [task1.scheduler_task.id, task1.scheduler_task.uuid, task1.scheduler_run.id]),
|
|
]
|
|
self.exec_asserts(res, 'ENV')
|
|
|
|
|
|
def testRetryFailed(self):
|
|
s = Scheduler(self.db)
|
|
failed = s.queue_task('demo2', retry_failed=1, period=1)
|
|
failed_consecutive = s.queue_task('demo8', retry_failed=2, repeats=2, period=1)
|
|
self.db.commit()
|
|
self.writefunction(r"""
|
|
def demo2():
|
|
1/0
|
|
|
|
def demo8():
|
|
placeholder = os.path.join(request.folder, 'private', 'demo8.pholder')
|
|
with open(placeholder, 'a') as g:
|
|
g.write('\nplaceholder for demo8 created')
|
|
num_of_lines = 0
|
|
with open(placeholder) as f:
|
|
num_of_lines = len([a for a in f.read().split('\n') if a])
|
|
print('number of lines', num_of_lines)
|
|
if num_of_lines <= 2:
|
|
1/0
|
|
else:
|
|
os.unlink(placeholder)
|
|
return 1
|
|
""")
|
|
ret = self.exec_sched()
|
|
# process finished just fine
|
|
self.assertEqual(ret, 0)
|
|
# failed - checks
|
|
task, task_run = self.fetch_results(s, failed)
|
|
res = [
|
|
("task status failed", task.status == 'FAILED'),
|
|
("task times_run is 0", task.times_run == 0),
|
|
("task times_failed is 2", task.times_failed == 2),
|
|
("task ran 2 times only", len(task_run) == 2),
|
|
("scheduler_run records are FAILED", (task_run[0].status == task_run[1].status == 'FAILED')),
|
|
("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period)))
|
|
]
|
|
self.exec_asserts(res, 'FAILED')
|
|
|
|
# failed consecutive - checks
|
|
task, task_run = self.fetch_results(s, failed_consecutive)
|
|
res = [
|
|
("task status completed", task.status == 'COMPLETED'),
|
|
("task times_run is 2", task.times_run == 2),
|
|
("task times_failed is 0", task.times_failed == 0),
|
|
("task ran 6 times", len(task_run) == 6),
|
|
("scheduler_run records for COMPLETED is 2", len([run.status for run in task_run if run.status == 'COMPLETED']) == 2),
|
|
("scheduler_run records for FAILED is 4", len([run.status for run in task_run if run.status == 'FAILED']) == 4),
|
|
]
|
|
self.exec_asserts(res, 'FAILED_CONSECUTIVE')
|
|
|
|
def testRegressions(self):
|
|
s = Scheduler(self.db)
|
|
huge_result = s.queue_task('demo10', retry_failed=1, period=1)
|
|
issue_1485 = s.queue_task('issue_1485')
|
|
termination = s.queue_task('termination')
|
|
self.db.commit()
|
|
self.writefunction(r"""
|
|
def demo10():
|
|
res = 'a' * 99999
|
|
return dict(res=res)
|
|
|
|
def issue_1485():
|
|
return response.render('issue_1485.html', dict(variable='abc'))
|
|
""")
|
|
self.writeview(r"""<span>{{=variable}}</span>""", 'issue_1485.html')
|
|
ret = self.exec_sched()
|
|
# process finished just fine
|
|
self.assertEqual(ret, 0)
|
|
# huge_result - checks
|
|
task_huge = s.task_status(huge_result.id, output=True)
|
|
res = [
|
|
("task status completed", task_huge.scheduler_task.status == 'COMPLETED'),
|
|
("task times_run is 1", task_huge.scheduler_task.times_run == 1),
|
|
("result is the correct one", task_huge.result == dict(res='a' * 99999))
|
|
]
|
|
self.exec_asserts(res, 'HUGE_RESULT')
|
|
|
|
task_issue_1485 = s.task_status(issue_1485.id, output=True)
|
|
res = [
|
|
("task status completed", task_issue_1485.scheduler_task.status == 'COMPLETED'),
|
|
("task times_run is 1", task_issue_1485.scheduler_task.times_run == 1),
|
|
("result is the correct one", task_issue_1485.result == '<span>abc</span>')
|
|
]
|
|
self.exec_asserts(res, 'issue_1485')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|