79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
import time
|
|
import pickle
|
|
from pydal import DAL, Field
|
|
from ._compat import unittest
|
|
from ._adapt import DEFAULT_URI, IS_IMAP, IS_MSSQL
|
|
from ._helpers import DALtest
|
|
|
|
|
|
class SimpleCache(object):
|
|
storage = {}
|
|
|
|
def clear(self):
|
|
self.storage.clear()
|
|
|
|
def _encode(self, value):
|
|
return value
|
|
|
|
def _decode(self, value):
|
|
return value
|
|
|
|
def __call__(self, key, f, time_expire=300):
|
|
dt = time_expire
|
|
now = time.time()
|
|
|
|
item = self.storage.get(key, None)
|
|
if item and f is None:
|
|
del self.storage[key]
|
|
|
|
if f is None:
|
|
return None
|
|
if item and (dt is None or item[0] > now - dt):
|
|
return self._decode(item[1])
|
|
|
|
value = f()
|
|
self.storage[key] = (now, self._encode(value))
|
|
return value
|
|
|
|
class PickleCache(SimpleCache):
|
|
def _encode(self, value):
|
|
return pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
|
|
|
|
def _decode(self, value):
|
|
return pickle.loads(value)
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestCache(DALtest):
|
|
def testRun(self):
|
|
cache = SimpleCache()
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
db.tt.insert(aa='1')
|
|
r0 = db().select(db.tt.ALL)
|
|
r1 = db().select(db.tt.ALL, cache=(cache, 1000))
|
|
self.assertEqual(len(r0), len(r1))
|
|
r2 = db().select(db.tt.ALL, cache=(cache, 1000))
|
|
self.assertEqual(len(r0), len(r2))
|
|
r3 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
|
|
self.assertEqual(len(r0), len(r3))
|
|
r4 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
|
|
self.assertEqual(len(r0), len(r4))
|
|
|
|
@unittest.skipIf(IS_MSSQL, "Class nesting in ODBC driver breaks pickle")
|
|
def testPickling(self):
|
|
db = self.connect()
|
|
cache = (PickleCache(), 1000)
|
|
db.define_table('tt', Field('aa'), Field('bb', type='integer'),
|
|
Field('cc', type='decimal(5,2)'))
|
|
db.tt.insert(aa='1', bb=2, cc=3)
|
|
r0 = db(db.tt).select(db.tt.ALL)
|
|
csv0 = str(r0)
|
|
r1 = db(db.tt).select(db.tt.ALL, cache=cache)
|
|
self.assertEqual(csv0, str(r1))
|
|
r2 = db(db.tt).select(db.tt.ALL, cache=cache)
|
|
self.assertEqual(csv0, str(r2))
|
|
r3 = db(db.tt).select(db.tt.ALL, cache=cache, cacheable=True)
|
|
self.assertEqual(csv0, str(r3))
|
|
r4 = db(db.tt).select(db.tt.ALL, cache=cache, cacheable=True)
|
|
self.assertEqual(csv0, str(r4))
|