SP/web2py/gluon/packages/dal/tests/contribs.py
Saturneic 064f602b1a Add.
2018-10-25 23:33:13 +08:00

159 lines
4.8 KiB
Python

import os
import threading
import time
from ._compat import unittest
from ._adapt import IS_GAE
from pydal._compat import to_bytes
from pydal.contrib.portalocker import lock, unlock, read_locked, write_locked
from pydal.contrib.portalocker import LockedFile, LOCK_EX
def tearDownModule():
if os.path.isfile('test.txt'):
os.unlink('test.txt')
class testPortalocker(unittest.TestCase):
def test_LockedFile(self):
f = LockedFile('test.txt', mode='wb')
f.write(to_bytes('test ok'))
f.close()
f = LockedFile('test.txt', mode='rb')
self.assertEqual(f.read(), to_bytes('test ok'))
f.close()
@unittest.skipIf(IS_GAE, "GAE has no locks")
def test_openmultiple(self):
t0 = time.time()
def worker1():
start = time.time()
f1 = LockedFile('test.txt', mode='ab')
time.sleep(2)
f1.write(to_bytes("%s\t%s\n" % (start, time.time())))
f1.close()
f = LockedFile('test.txt', mode='wb')
f.write(to_bytes(''))
f.close()
th = []
for x in range(10):
t1 = threading.Thread(target=worker1)
th.append(t1)
t1.start()
for t in th:
t.join()
with open('test.txt') as g:
content = g.read()
results = [line.strip().split('\t') for line in content.split('\n') if line]
# all started at more or less the same time
starts = [1 for line in results if float(line[0])-t0<1]
ends = [line[1] for line in results]
self.assertEqual(sum(starts), len(starts))
# end - start is at least 2
for line in results:
self.assertTrue(float(line[1]) - float(line[0]) >= 2)
# ends are not the same
self.assertTrue(len(ends) == len(ends))
@unittest.skipIf(IS_GAE, "GAE has no locks")
def test_lock_unlock(self):
def worker1(fh):
time.sleep(2)
unlock(fh)
def worker2(fh):
time.sleep(2)
fh.close()
f = open('test.txt', mode='wb')
lock(f, LOCK_EX)
f.write(to_bytes('test ok'))
t1 = threading.Thread(target=worker1, args=(f, ))
t1.start()
start = int(time.time())
content = read_locked('test.txt')
end = int(time.time())
t1.join()
f.close()
# it took at least 2 seconds to read
# although nothing is there until .close()
self.assertTrue(end - start >= 2)
self.assertEqual(content, to_bytes(''))
content = read_locked('test.txt')
self.assertEqual(content, to_bytes('test ok'))
f = LockedFile('test.txt', mode='wb')
f.write(to_bytes('test ok'))
t1 = threading.Thread(target=worker2, args=(f, ))
t1.start()
start = int(time.time())
content = read_locked('test.txt')
end = int(time.time())
t1.join()
# it took at least 2 seconds to read
# content is there because we called close()
self.assertTrue(end - start >= 2)
self.assertEqual(content, to_bytes('test ok'))
@unittest.skipIf(IS_GAE, "GAE has no locks")
def test_read_locked(self):
def worker(fh):
time.sleep(2)
fh.close()
f = LockedFile('test.txt', mode='wb')
f.write(to_bytes('test ok'))
t1 = threading.Thread(target=worker, args=(f, ))
t1.start()
start = int(time.time())
content = read_locked('test.txt')
end = int(time.time())
t1.join()
# it took at least 2 seconds to read
self.assertTrue(end - start >= 2)
self.assertEqual(content, to_bytes('test ok'))
@unittest.skipIf(IS_GAE, "GAE has no locks")
def test_write_locked(self):
def worker(fh):
time.sleep(2)
fh.close()
f = open('test.txt', mode='wb')
lock(f, LOCK_EX)
t1 = threading.Thread(target=worker, args=(f, ))
t1.start()
start = int(time.time())
write_locked('test.txt', to_bytes('test ok'))
end = int(time.time())
t1.join()
with open('test.txt') as g:
content = g.read()
# it took at least 2 seconds to read
self.assertTrue(end - start >= 2)
self.assertEqual(content, 'test ok')
def test_exception(self):
self.assertRaises(RuntimeError, LockedFile, *['test.txt', 'x'])
def test_readline(self):
f = LockedFile('test.txt', 'wb')
f.write(to_bytes('abc\n'))
f.write(to_bytes('123\n'))
f.close()
f = LockedFile('test.txt', 'rb')
rl = f.readline()
self.assertTrue(to_bytes('abc') in rl)
rl = f.readline()
self.assertTrue(to_bytes('123') in rl)
f.close()
f = LockedFile('test.txt', 'rb')
rls = f.readlines()
f.close()
self.assertEqual(len(rls), 2)