#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Unit tests for gluon.scheduler
"""
import os
import unittest
import glob
import datetime
import sys
from gluon.storage import Storage
from gluon.languages import translator
from gluon.scheduler import JobGraph, Scheduler, CronParser
from gluon.dal import DAL
class BaseTestScheduler(unittest.TestCase):
def setUp(self):
self.db = None
self.cleanfolder()
from gluon.globals import current
s = Storage({'application': 'welcome',
'folder': 'applications/welcome',
'controller': 'default'})
current.request = s
T = translator('', 'en')
current.T = T
self.db = DAL('sqlite://dummy2.db', check_reserved=['all'])
def cleanfolder(self):
if self.db:
self.db.close()
try:
os.unlink('dummy2.db')
except:
pass
tfiles = glob.glob('*_scheduler*.table')
for a in tfiles:
os.unlink(a)
def tearDown(self):
self.cleanfolder()
try:
self.inner_teardown()
except:
pass
class CronParserTest(unittest.TestCase):
def testMinute(self):
# minute asterisk
base = datetime.datetime(2010, 1, 23, 12, 18)
itr = CronParser('*/1 * * * *', base)
n1 = itr.get_next() # 19
self.assertEqual(base.year, n1.year)
self.assertEqual(base.month, n1.month)
self.assertEqual(base.day, n1.day)
self.assertEqual(base.hour, n1.hour)
self.assertEqual(base.minute, n1.minute - 1)
for i in range(39): # ~ 58
itr.get_next()
n2 = itr.get_next()
self.assertEqual(n2.minute, 59)
n3 = itr.get_next()
self.assertEqual(n3.minute, 0)
self.assertEqual(n3.hour, 13)
itr = CronParser('*/5 * * * *', base)
n4 = itr.get_next()
self.assertEqual(n4.minute, 20)
for i in range(6):
itr.get_next()
n5 = itr.get_next()
self.assertEqual(n5.minute, 55)
n6 = itr.get_next()
self.assertEqual(n6.minute, 0)
self.assertEqual(n6.hour, 13)
base = datetime.datetime(2010, 1, 23, 12, 18)
itr = CronParser('4/34 * * * *', base)
n7 = itr.get_next()
self.assertEqual(n7.minute, 38)
self.assertEqual(n7.hour, 12)
n8 = itr.get_next()
self.assertEqual(n8.minute, 4)
self.assertEqual(n8.hour, 13)
def testHour(self):
base = datetime.datetime(2010, 1, 24, 12, 2)
itr = CronParser('0 */3 * * *', base)
n1 = itr.get_next()
self.assertEqual(n1.hour, 15)
self.assertEqual(n1.minute, 0)
for i in range(2):
itr.get_next()
n2 = itr.get_next()
self.assertEqual(n2.hour, 0)
self.assertEqual(n2.day, 25)
def testDay(self):
base = datetime.datetime(2010, 2, 24, 12, 9)
itr = CronParser('0 0 */3 * *', base)
n1 = itr.get_next()
# 1 4 7 10 13 16 19 22 25 28
self.assertEqual(n1.day, 25)
n2 = itr.get_next()
self.assertEqual(n2.day, 28)
n3 = itr.get_next()
self.assertEqual(n3.day, 1)
self.assertEqual(n3.month, 3)
# test leap year
base = datetime.datetime(1996, 2, 27)
itr = CronParser('0 0 * * *', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 28)
self.assertEqual(n1.month, 2)
n2 = itr.get_next()
self.assertEqual(n2.day, 29)
self.assertEqual(n2.month, 2)
base2 = datetime.datetime(2000, 2, 27)
itr2 = CronParser('0 0 * * *', base2)
n3 = itr2.get_next()
self.assertEqual(n3.day, 28)
self.assertEqual(n3.month, 2)
n4 = itr2.get_next()
self.assertEqual(n4.day, 29)
self.assertEqual(n4.month, 2)
def testWeekDay(self):
base = datetime.datetime(2010, 2, 25)
itr = CronParser('0 0 * * sat', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 6)
self.assertEqual(n1.day, 27)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 6)
self.assertEqual(n2.day, 6)
self.assertEqual(n2.month, 3)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 * wed', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 1)
self.assertEqual(n1.day, 27)
self.assertEqual(n1.year, 2010)
n2 = itr.get_next()
self.assertEqual(n2.month, 2)
self.assertEqual(n2.day, 1)
self.assertEqual(n2.year, 2010)
n3 = itr.get_next()
self.assertEqual(n3.month, 2)
self.assertEqual(n3.day, 3)
self.assertEqual(n3.year, 2010)
def testMonth(self):
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 * *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 2)
self.assertEqual(n1.day, 1)
n2 = itr.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 1)
for i in range(8):
itr.get_next()
n3 = itr.get_next()
self.assertEqual(n3.month, 12)
self.assertEqual(n3.year, 2010)
n4 = itr.get_next()
self.assertEqual(n4.month, 1)
self.assertEqual(n4.year, 2011)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 */4 *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 5)
self.assertEqual(n1.day, 1)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 1-3 *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 2)
self.assertEqual(n1.day, 1)
n2 = itr.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 1)
n3 = itr.get_next()
self.assertEqual(n3.month, 1)
self.assertEqual(n3.day, 1)
def testSundayToThursdayWithAlphaConversion(self):
base = datetime.datetime(2010, 8, 25, 15, 56)
itr = CronParser("30 22 * * sun-thu", base)
n1 = itr.get_next()
self.assertEqual(base.year, n1.year)
self.assertEqual(base.month, n1.month)
self.assertEqual(base.day, n1.day)
self.assertEqual(22, n1.hour)
self.assertEqual(30, n1.minute)
def testISOWeekday(self):
base = datetime.datetime(2010, 2, 25)
itr = CronParser('0 0 * * 7', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 7)
self.assertEqual(n1.day, 28)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 7)
self.assertEqual(n2.day, 7)
self.assertEqual(n2.month, 3)
base = datetime.datetime(2010, 2, 22)
itr = CronParser('0 0 * * */2', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 2)
self.assertEqual(n1.day, 23)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 4)
self.assertEqual(n2.day, 25)
def testBug2(self):
base = datetime.datetime(2012, 1, 1, 0, 0)
itr = CronParser('0 * * 3 *', base)
n1 = itr.get_next()
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.month, 3)
self.assertEqual(n1.day, base.day)
self.assertEqual(n1.hour, base.hour)
self.assertEqual(n1.minute, base.minute)
n2 = itr.get_next()
self.assertEqual(n2.year, base.year)
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, base.day)
self.assertEqual(n2.hour, base.hour + 1)
self.assertEqual(n2.minute, base.minute)
n3 = itr.get_next()
self.assertEqual(n3.year, base.year)
self.assertEqual(n3.month, 3)
self.assertEqual(n3.day, base.day)
self.assertEqual(n3.hour, base.hour + 2)
self.assertEqual(n3.minute, base.minute)
def testBug3(self):
base = datetime.datetime(2013, 3, 1, 12, 17, 34, 257877)
c = CronParser('00 03 16,30 * *', base)
n1 = c.get_next()
self.assertEqual(n1.month, 3)
self.assertEqual(n1.day, 16)
n2 = c.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 30)
n3 = c.get_next()
self.assertEqual(n3.month, 4)
self.assertEqual(n3.day, 16)
def test_rangeGenerator(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('1-9/2 0 1 * *', base)
n1 = itr.get_next()
n2 = itr.get_next()
n3 = itr.get_next()
n4 = itr.get_next()
n5 = itr.get_next()
self.assertEqual(n1.minute, 1)
self.assertEqual(n2.minute, 3)
self.assertEqual(n3.minute, 5)
self.assertEqual(n4.minute, 7)
self.assertEqual(n5.minute, 9)
def test_iterGenerator(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('1-9/2 0 1 * *', base)
x = 0
for n in itr:
x += 1
if x > 4:
break
self.assertEqual(n.minute, 9)
def test_invalidcron(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('5 4 31 2 *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * 5-1 * *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * * janu-jun *', base)
self.assertRaises(KeyError, itr.get_next)
itr = CronParser('* * * * * *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * * *', base)
self.assertRaises(ValueError, itr.get_next)
def testLastDayOfMonth(self):
base = datetime.datetime(2015, 9, 4)
itr = CronParser('0 0 L * *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 9)
self.assertEqual(n1.day, 30)
n2 = itr.get_next()
self.assertEqual(n2.month, 10)
self.assertEqual(n2.day, 31)
n3 = itr.get_next()
self.assertEqual(n3.month, 11)
self.assertEqual(n3.day, 30)
n4 = itr.get_next()
self.assertEqual(n4.month, 12)
self.assertEqual(n4.day, 31)
base = datetime.datetime(1996, 2, 27)
itr = CronParser('0 0 L * *', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 29)
self.assertEqual(n1.month, 2)
n2 = itr.get_next()
self.assertEqual(n2.day, 31)
self.assertEqual(n2.month, 3)
def testSpecialExpr(self):
base = datetime.datetime(2000, 1, 1)
itr = CronParser('@yearly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, 1)
self.assertEqual(n1.year, base.year + 1)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@annually', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, 1)
self.assertEqual(n1.year, base.year + 1)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@monthly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, base.month + 1)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@weekly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
n2 = itr.get_next()
self.assertEqual(n2.day, 9)
self.assertEqual(n2.month, base.month)
self.assertEqual(n2.year, base.year)
self.assertEqual(n2.hour, 0)
self.assertEqual(n2.minute, 0)
n3 = itr.get_next()
self.assertEqual(n3.day, 16)
self.assertEqual(n3.month, base.month)
self.assertEqual(n3.year, base.year)
self.assertEqual(n3.hour, 0)
self.assertEqual(n3.minute, 0)
itr = CronParser('@daily', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@midnight', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@hourly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 1)
self.assertEqual(n1.minute, 0)
class TestsForJobGraph(BaseTestScheduler):
def testJobGraph(self):
s = Scheduler(self.db)
myjob = JobGraph(self.db, 'job_1')
fname = 'foo'
# We have a few items to wear, and there's an "order" to respect...
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
# Now, we can't put on the tie without wearing the shirt first, etc...
watch = s.queue_task(fname, task_name='watch')
jacket = s.queue_task(fname, task_name='jacket')
shirt = s.queue_task(fname, task_name='shirt')
tie = s.queue_task(fname, task_name='tie')
pants = s.queue_task(fname, task_name='pants')
undershorts = s.queue_task(fname, task_name='undershorts')
belt = s.queue_task(fname, task_name='belt')
shoes = s.queue_task(fname, task_name='shoes')
socks = s.queue_task(fname, task_name='socks')
# before the tie, comes the shirt
myjob.add_deps(tie.id, shirt.id)
# before the belt too comes the shirt
myjob.add_deps(belt.id, shirt.id)
# before the jacket, comes the tie
myjob.add_deps(jacket.id, tie.id)
# before the belt, come the pants
myjob.add_deps(belt.id, pants.id)
# before the shoes, comes the pants
myjob.add_deps(shoes.id, pants.id)
# before the pants, comes the undershorts
myjob.add_deps(pants.id, undershorts.id)
# before the shoes, comes the undershorts
myjob.add_deps(shoes.id, undershorts.id)
# before the jacket, comes the belt
myjob.add_deps(jacket.id, belt.id)
# before the shoes, comes the socks
myjob.add_deps(shoes.id, socks.id)
## results in the following topological sort
# 9,3,6 --> 4,5 --> 8,7 --> 2
# socks, shirt, undershorts
# tie, pants
# shoes, belt
# jacket
known_toposort = [
set([socks.id, shirt.id, undershorts.id]),
set([tie.id, pants.id]),
set([shoes.id, belt.id]),
set([jacket.id])
]
toposort = myjob.validate('job_1')
self.assertEqual(toposort, known_toposort)
# add a cyclic dependency, jacket to undershorts
myjob.add_deps(undershorts.id, jacket.id)
# no exceptions raised, but result None
self.assertEqual(myjob.validate('job_1'), None)
def testJobGraphFailing(self):
s = Scheduler(self.db)
myjob = JobGraph(self.db, 'job_1')
fname = 'foo'
# We have a few items to wear, and there's an "order" to respect...
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
# Now, we can't put on the tie without wearing the shirt first, etc...
watch = s.queue_task(fname, task_name='watch')
jacket = s.queue_task(fname, task_name='jacket')
shirt = s.queue_task(fname, task_name='shirt')
tie = s.queue_task(fname, task_name='tie')
pants = s.queue_task(fname, task_name='pants')
undershorts = s.queue_task(fname, task_name='undershorts')
belt = s.queue_task(fname, task_name='belt')
shoes = s.queue_task(fname, task_name='shoes')
socks = s.queue_task(fname, task_name='socks')
# before the tie, comes the shirt
myjob.add_deps(tie.id, shirt.id)
# before the belt too comes the shirt
myjob.add_deps(belt.id, shirt.id)
# before the jacket, comes the tie
myjob.add_deps(jacket.id, tie.id)
# before the belt, come the pants
myjob.add_deps(belt.id, pants.id)
# before the shoes, comes the pants
myjob.add_deps(shoes.id, pants.id)
# before the pants, comes the undershorts
myjob.add_deps(pants.id, undershorts.id)
# before the shoes, comes the undershorts
myjob.add_deps(shoes.id, undershorts.id)
# before the jacket, comes the belt
myjob.add_deps(jacket.id, belt.id)
# before the shoes, comes the socks
myjob.add_deps(shoes.id, socks.id)
# add a cyclic dependency, jacket to undershorts
myjob.add_deps(undershorts.id, jacket.id)
# no exceptions raised, but result None
self.assertEqual(myjob.validate('job_1'), None)
# and no deps added
deps_inserted = self.db(self.db.scheduler_task_deps.id>0).count()
self.assertEqual(deps_inserted, 0)
def testJobGraphDifferentJobs(self):
s = Scheduler(self.db)
myjob1 = JobGraph(self.db, 'job_1')
myjob2 = JobGraph(self.db, 'job_2')
fname = 'foo'
# We have a few items to wear, and there's an "order" to respect...
# Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
# Now, we can't put on the tie without wearing the shirt first, etc...
watch = s.queue_task(fname, task_name='watch')
jacket = s.queue_task(fname, task_name='jacket')
shirt = s.queue_task(fname, task_name='shirt')
tie = s.queue_task(fname, task_name='tie')
pants = s.queue_task(fname, task_name='pants')
undershorts = s.queue_task(fname, task_name='undershorts')
belt = s.queue_task(fname, task_name='belt')
shoes = s.queue_task(fname, task_name='shoes')
socks = s.queue_task(fname, task_name='socks')
# before the tie, comes the shirt
myjob1.add_deps(tie.id, shirt.id)
# before the belt too comes the shirt
myjob1.add_deps(belt.id, shirt.id)
# before the jacket, comes the tie
myjob1.add_deps(jacket.id, tie.id)
# before the belt, come the pants
myjob1.add_deps(belt.id, pants.id)
# before the shoes, comes the pants
myjob2.add_deps(shoes.id, pants.id)
# before the pants, comes the undershorts
myjob2.add_deps(pants.id, undershorts.id)
# before the shoes, comes the undershorts
myjob2.add_deps(shoes.id, undershorts.id)
# before the jacket, comes the belt
myjob2.add_deps(jacket.id, belt.id)
# before the shoes, comes the socks
myjob2.add_deps(shoes.id, socks.id)
# every job by itself can be completed
self.assertNotEqual(myjob1.validate('job_1'), None)
self.assertNotEqual(myjob1.validate('job_2'), None)
# and, implicitly, every queued task can be too
self.assertNotEqual(myjob1.validate(), None)
# add a cyclic dependency, jacket to undershorts
myjob2.add_deps(undershorts.id, jacket.id)
# every job can still be completed by itself
self.assertNotEqual(myjob1.validate('job_1'), None)
self.assertNotEqual(myjob1.validate('job_2'), None)
# but trying to see if every task will ever be completed fails
self.assertEqual(myjob2.validate(), None)
class TestsForSchedulerAPIs(BaseTestScheduler):
def testQueue_Task(self):
def isnotqueued(result):
self.assertEqual(result.id, None)
self.assertEqual(result.uuid, None)
self.assertEqual(len(result.errors.keys()) > 0, True)
def isqueued(result):
self.assertNotEqual(result.id, None)
self.assertNotEqual(result.uuid, None)
self.assertEqual(len(result.errors.keys()), 0)
s = Scheduler(self.db)
fname = 'foo'
watch = s.queue_task(fname, task_name='watch')
# queuing a task returns id, errors, uuid
self.assertEqual(set(watch.keys()), set(['id', 'uuid', 'errors']))
# queueing nothing isn't allowed
self.assertRaises(TypeError, s.queue_task, *[])
# passing pargs and pvars wrongly
# # pargs as dict
isnotqueued(s.queue_task(fname, dict(a=1), dict(b=1)))
# # pvars as list
isnotqueued(s.queue_task(fname, ['foo', 'bar'], ['foo', 'bar']))
# two tasks with the same uuid won't be there
isqueued(s.queue_task(fname, uuid='a'))
isnotqueued(s.queue_task(fname, uuid='a'))
# # #FIXME add here every parameter
def testTask_Status(self):
s = Scheduler(self.db)
fname = 'foo'
watch = s.queue_task(fname, task_name='watch')
# fetch status by id
by_id = s.task_status(watch.id)
# fetch status by uuid
by_uuid = s.task_status(watch.uuid)
# fetch status by query
by_query = s.task_status(self.db.scheduler_task.function_name == 'foo')
self.assertEqual(by_id, by_uuid)
self.assertEqual(by_id, by_query)
# fetch status by anything else throws
self.assertRaises(SyntaxError, s.task_status, *[[1, 2]])
# adding output returns the joined set, plus "result"
rtn = s.task_status(watch.id, output=True)
self.assertEqual(set(rtn.keys()), set(['scheduler_run', 'scheduler_task', 'result']))
class testForSchedulerRunnerBase(BaseTestScheduler):
def inner_teardown(self):
from gluon import current
fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
os.unlink(fdest)
additional_files = [
os.path.join(current.request.folder, 'private', 'demo8.pholder'),
os.path.join(current.request.folder, 'views', 'issue_1485_2.html'),
]
for f in additional_files:
try:
os.unlink(f)
except:
pass
def writeview(self, content, dest=None):
from gluon import current
fdest = os.path.join(current.request.folder, 'views', dest)
with open(fdest, 'w') as q:
q.write(content)
def writefunction(self, content, initlines=None):
from gluon import current
fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
if initlines is None:
initlines = """
import os
import time
from gluon.scheduler import Scheduler
db_dal = os.path.abspath(os.path.join(request.folder, '..', '..', 'dummy2.db'))
sched_dal = DAL('sqlite://%s' % db_dal, folder=os.path.dirname(db_dal))
sched = Scheduler(sched_dal, max_empty_runs=15, migrate=False, heartbeat=1)
def termination():
sched.terminate()
sched_dal.commit()
"""
with open(fdest, 'w') as q:
q.write(initlines)
q.write(content)
def exec_sched(self):
import subprocess
call_args = [sys.executable, 'web2py.py', '--no-banner', '-D', '20','-K', 'welcome']
ret = subprocess.call(call_args, env=dict(os.environ))
return ret
def fetch_results(self, sched, task):
info = sched.task_status(task.id)
task_runs = self.db(self.db.scheduler_run.task_id == task.id).select()
return info, task_runs
def exec_asserts(self, stmts, tag):
for stmt in stmts:
self.assertEqual(stmt[1], True, msg="%s - %s" % (tag, stmt[0]))
class TestsForSchedulerRunner(testForSchedulerRunnerBase):
def testRepeats_and_Expired_and_Prio(self):
s = Scheduler(self.db)
repeats = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), repeats=2, period=5)
a_while_ago = datetime.datetime.now() - datetime.timedelta(seconds=60)
expired = s.queue_task('demo4', stop_time=a_while_ago)
prio1 = s.queue_task('demo1', ['scheduled_first'])
prio2 = s.queue_task('demo1', ['scheduled_second'], next_run_time=a_while_ago)
self.db.commit()
self.writefunction(r"""
def demo1(*args,**vars):
print('you passed args=%s and vars=%s' % (args, vars))
return args[0]
def demo4():
time.sleep(15)
print("I'm printing something")
return dict(a=1, b=2)
""")
ret = self.exec_sched()
self.assertEqual(ret, 0)
# repeats check
task, task_run = self.fetch_results(s, repeats)
res = [
("task status completed", task.status == 'COMPLETED'),
("task times_run is 2", task.times_run == 2),
("task ran 2 times only", len(task_run) == 2),
("scheduler_run records are COMPLETED ", (task_run[0].status == task_run[1].status == 'COMPLETED')),
("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period)))
]
self.exec_asserts(res, 'REPEATS')
# expired check
task, task_run = self.fetch_results(s, expired)
res = [
("task status expired", task.status == 'EXPIRED'),
("task times_run is 0", task.times_run == 0),
("task didn't run at all", len(task_run) == 0)
]
self.exec_asserts(res, 'EXPIRATION')
# prio check
task1 = s.task_status(prio1.id, output=True)
task2 = s.task_status(prio2.id, output=True)
res = [
("tasks status completed", task1.scheduler_task.status == task2.scheduler_task.status == 'COMPLETED'),
("priority2 was executed before priority1" , task1.scheduler_run.id > task2.scheduler_run.id)
]
self.exec_asserts(res, 'PRIORITY')
def testNoReturn_and_Timeout_and_Progress(self):
s = Scheduler(self.db)
noret1 = s.queue_task('demo5')
noret2 = s.queue_task('demo3')
timeout1 = s.queue_task('demo4', timeout=5)
timeout2 = s.queue_task('demo4')
progress = s.queue_task('demo6', sync_output=2)
termination = s.queue_task('termination')
self.db.commit()
self.writefunction(r"""
def demo3():
time.sleep(3)
print(1/0)
return None
def demo4():
time.sleep(15)
print("I'm printing something")
return dict(a=1, b=2)
def demo5():
time.sleep(3)
print("I'm printing something")
rtn = dict(a=1, b=2)
def demo6():
time.sleep(5)
print('50%')
time.sleep(5)
print('!clear!100%')
return 1
""")
ret = self.exec_sched()
self.assertEqual(ret, 0)
# noreturn check
task1, task_run1 = self.fetch_results(s, noret1)
task2, task_run2 = self.fetch_results(s, noret2)
res = [
("tasks no_returns1 completed", task1.status == 'COMPLETED'),
("tasks no_returns2 failed", task2.status == 'FAILED'),
("no_returns1 doesn't have a scheduler_run record", len(task_run1) == 0),
("no_returns2 has a scheduler_run record FAILED", (len(task_run2) == 1 and task_run2[0].status == 'FAILED')),
]
self.exec_asserts(res, 'NO_RETURN')
# timeout check
task1 = s.task_status(timeout1.id, output=True)
task2 = s.task_status(timeout2.id, output=True)
res = [
("tasks timeouts1 timeoutted", task1.scheduler_task.status == 'TIMEOUT'),
("tasks timeouts2 completed", task2.scheduler_task.status == 'COMPLETED')
]
self.exec_asserts(res, 'TIMEOUT')
# progress check
task1 = s.task_status(progress.id, output=True)
res = [
("tasks percentages completed", task1.scheduler_task.status == 'COMPLETED'),
("output contains only 100%", task1.scheduler_run.run_output.strip() == "100%")
]
self.exec_asserts(res, 'PROGRESS')
def testDrift_and_env_and_immediate(self):
s = Scheduler(self.db)
immediate = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), immediate=True)
env = s.queue_task('demo7')
drift = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), period=93, prevent_drift=True)
termination = s.queue_task('termination')
self.db.commit()
self.writefunction(r"""
def demo1(*args,**vars):
print('you passed args=%s and vars=%s' % (args, vars))
return args[0]
import random
def demo7():
time.sleep(random.randint(1,5))
print(W2P_TASK, request.now)
return W2P_TASK.id, W2P_TASK.uuid, W2P_TASK.run_id
""")
ret = self.exec_sched()
self.assertEqual(ret, 0)
# immediate check, can only check that nothing breaks
task1 = s.task_status(immediate.id)
res = [
("tasks status completed", task1.status == 'COMPLETED'),
]
self.exec_asserts(res, 'IMMEDIATE')
# drift check
task, task_run = self.fetch_results(s, drift)
res = [
("task status completed", task.status == 'COMPLETED'),
("next_run_time is exactly start_time + period", (task.next_run_time == task.start_time + datetime.timedelta(seconds=task.period)))
]
self.exec_asserts(res, 'DRIFT')
# env check
task1 = s.task_status(env.id, output=True)
res = [
("task %s returned W2P_TASK correctly" % (task1.scheduler_task.id), task1.result == [task1.scheduler_task.id, task1.scheduler_task.uuid, task1.scheduler_run.id]),
]
self.exec_asserts(res, 'ENV')
def testRetryFailed(self):
s = Scheduler(self.db)
failed = s.queue_task('demo2', retry_failed=1, period=1)
failed_consecutive = s.queue_task('demo8', retry_failed=2, repeats=2, period=1)
self.db.commit()
self.writefunction(r"""
def demo2():
1/0
def demo8():
placeholder = os.path.join(request.folder, 'private', 'demo8.pholder')
with open(placeholder, 'a') as g:
g.write('\nplaceholder for demo8 created')
num_of_lines = 0
with open(placeholder) as f:
num_of_lines = len([a for a in f.read().split('\n') if a])
print('number of lines', num_of_lines)
if num_of_lines <= 2:
1/0
else:
os.unlink(placeholder)
return 1
""")
ret = self.exec_sched()
# process finished just fine
self.assertEqual(ret, 0)
# failed - checks
task, task_run = self.fetch_results(s, failed)
res = [
("task status failed", task.status == 'FAILED'),
("task times_run is 0", task.times_run == 0),
("task times_failed is 2", task.times_failed == 2),
("task ran 2 times only", len(task_run) == 2),
("scheduler_run records are FAILED", (task_run[0].status == task_run[1].status == 'FAILED')),
("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period)))
]
self.exec_asserts(res, 'FAILED')
# failed consecutive - checks
task, task_run = self.fetch_results(s, failed_consecutive)
res = [
("task status completed", task.status == 'COMPLETED'),
("task times_run is 2", task.times_run == 2),
("task times_failed is 0", task.times_failed == 0),
("task ran 6 times", len(task_run) == 6),
("scheduler_run records for COMPLETED is 2", len([run.status for run in task_run if run.status == 'COMPLETED']) == 2),
("scheduler_run records for FAILED is 4", len([run.status for run in task_run if run.status == 'FAILED']) == 4),
]
self.exec_asserts(res, 'FAILED_CONSECUTIVE')
def testRegressions(self):
s = Scheduler(self.db)
huge_result = s.queue_task('demo10', retry_failed=1, period=1)
issue_1485 = s.queue_task('issue_1485')
termination = s.queue_task('termination')
self.db.commit()
self.writefunction(r"""
def demo10():
res = 'a' * 99999
return dict(res=res)
def issue_1485():
return response.render('issue_1485.html', dict(variable='abc'))
""")
self.writeview(r"""{{=variable}}""", 'issue_1485.html')
ret = self.exec_sched()
# process finished just fine
self.assertEqual(ret, 0)
# huge_result - checks
task_huge = s.task_status(huge_result.id, output=True)
res = [
("task status completed", task_huge.scheduler_task.status == 'COMPLETED'),
("task times_run is 1", task_huge.scheduler_task.times_run == 1),
("result is the correct one", task_huge.result == dict(res='a' * 99999))
]
self.exec_asserts(res, 'HUGE_RESULT')
task_issue_1485 = s.task_status(issue_1485.id, output=True)
res = [
("task status completed", task_issue_1485.scheduler_task.status == 'COMPLETED'),
("task times_run is 1", task_issue_1485.scheduler_task.times_run == 1),
("result is the correct one", task_issue_1485.result == 'abc')
]
self.exec_asserts(res, 'issue_1485')
if __name__ == '__main__':
unittest.main()