SP/web2py/gluon/packages/dal/tests/sql.py
Saturneic 064f602b1a Add.
2018-10-25 23:33:13 +08:00

3014 lines
120 KiB
Python

# -*- coding: utf-8 -*-
"""
Basic unit tests
"""
from __future__ import print_function
import os
import glob
import datetime
import json
import pickle
from pydal._compat import basestring, StringIO, integer_types, xrange, BytesIO, to_bytes
from pydal import DAL, Field
from pydal.helpers.classes import SQLALL, OpRow
from pydal.objects import Table, Expression, Row
from ._compat import unittest
from ._adapt import (
DEFAULT_URI, IS_POSTGRESQL, IS_SQLITE, IS_MSSQL, IS_MYSQL, IS_TERADATA)
from ._helpers import DALtest
long = integer_types[-1]
print('Testing against %s engine (%s)' % (DEFAULT_URI.partition(':')[0],
DEFAULT_URI))
ALLOWED_DATATYPES = [
'string',
'text',
'integer',
'boolean',
'double',
'blob',
'date',
'time',
'datetime',
'upload',
'password',
'json',
'bigint'
]
def setUpModule():
if IS_MYSQL or IS_TERADATA:
db = DAL(DEFAULT_URI, check_reserved=['all'])
def clean_table(db, tablename):
try:
db.define_table(tablename)
except Exception as e:
pass
try:
db[tablename].drop()
except Exception as e:
pass
for tablename in ['tt', 't0', 't1', 't2', 't3', 't4',
'easy_name', 'tt_archive', 'pet_farm', 'person']:
clean_table(db, tablename)
db.close()
def tearDownModule():
if os.path.isfile('sql.log'):
os.unlink('sql.log')
for a in glob.glob('*.table'):
os.unlink(a)
class TestFields(DALtest):
def testFieldName(self):
"""
- a "str" something
- not a method or property of Table
- "dotted-notation" friendly:
- a valid python identifier
- not a python keyword
- not starting with underscore or an integer
- not containing dots
Basically, anything alphanumeric, no symbols, only underscore as
punctuation
"""
# Check that Fields cannot start with underscores
self.assertRaises(SyntaxError, Field, '_abc', 'string')
# Check that Fields cannot contain punctuation other than underscores
self.assertRaises(SyntaxError, Field, 'a.bc', 'string')
# Check that Fields cannot be a name of a method or property of Table
for x in ['drop', 'on', 'truncate']:
self.assertRaises(SyntaxError, Field, x, 'string')
# Check that Fields allows underscores in the body of a field name.
self.assertTrue(Field('a_bc', 'string'),
"Field isn't allowing underscores in fieldnames. It should.")
# Check that Field names don't allow a python keyword
self.assertRaises(SyntaxError, Field, 'True', 'string')
self.assertRaises(SyntaxError, Field, 'elif', 'string')
self.assertRaises(SyntaxError, Field, 'while', 'string')
# Check that Field names don't allow a non-valid python identifier
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
for a in non_valid_examples:
self.assertRaises(SyntaxError, Field, a, 'string')
# Check that Field names don't allow a unicode string
non_valid_examples = non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
for a in non_valid_examples:
self.assertRaises(SyntaxError, Field, a, 'string')
def testFieldTypes(self):
# Check that string, and password default length is 512
for typ in ['string', 'password']:
self.assertTrue(Field('abc', typ).length == 512,
"Default length for type '%s' is not 512 or 255" % typ)
# Check that upload default length is 512
self.assertTrue(Field('abc', 'upload').length == 512,
"Default length for type 'upload' is not 512")
# Check that Tables passed in the type creates a reference
self.assertTrue(Field('abc', Table(None, 'temp')).type
== 'reference temp',
'Passing a Table does not result in a reference type.')
def testFieldLabels(self):
# Check that a label is successfully built from the supplied fieldname
self.assertTrue(Field('abc', 'string').label == 'Abc',
'Label built is incorrect')
self.assertTrue(Field('abc_def', 'string').label == 'Abc Def',
'Label built is incorrect')
def testFieldFormatters(self): # Formatter should be called Validator
# Test the default formatters
for typ in ALLOWED_DATATYPES:
f = Field('abc', typ)
if typ not in ['date', 'time', 'datetime']:
isinstance(f.formatter('test'), str)
else:
isinstance(f.formatter(datetime.datetime.now()), str)
def testUploadField(self):
import tempfile
stream = tempfile.NamedTemporaryFile()
content = b"this is the stream content"
stream.write(content)
# rewind before inserting
stream.seek(0)
db = self.connect()
db.define_table('tt', Field('fileobj', 'upload',
uploadfolder=tempfile.gettempdir(),
autodelete=True))
f_id = db.tt.insert(fileobj=stream)
row = db.tt[f_id]
(retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
# name should be the same
self.assertEqual(retr_name, os.path.basename(stream.name))
# content should be the same
retr_content = retr_stream.read()
self.assertEqual(retr_content, content)
# close streams!
retr_stream.close()
# delete
row.delete_record()
# drop
db.tt.drop()
# this part is triggered only if fs (AKA pyfilesystem) module is installed
try:
from fs.memoryfs import MemoryFS
# rewind before inserting
stream.seek(0)
db.define_table('tt', Field('fileobj', 'upload',
uploadfs=MemoryFS(),
autodelete=True))
f_id = db.tt.insert(fileobj=stream)
row = db.tt[f_id]
(retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
# name should be the same
self.assertEqual(retr_name, os.path.basename(stream.name))
# content should be the same
retr_content = retr_stream.read()
self.assertEqual(retr_content, content)
# close streams
retr_stream.close()
stream.close()
# delete
row.delete_record()
# drop
db.tt.drop()
except ImportError:
pass
def testBlobBytes(self):
#Test blob with latin1 encoded bytes
db = self.connect()
obj = pickle.dumps('0')
db.define_table('tt', Field('aa', 'blob'))
self.assertEqual(db.tt.insert(aa=obj), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, obj)
self.assertEqual(db.tt[1].aa, obj)
self.assertEqual(BytesIO(to_bytes(db.tt[1].aa)).read(), obj)
db.tt.drop()
def testRun(self):
"""Test all field types and their return values"""
db = self.connect()
for ft in ['string', 'text', 'password', 'upload', 'blob']:
db.define_table('tt', Field('aa', ft, default=''))
self.assertEqual(db.tt.insert(aa='ö'), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 'ö')
db.tt.drop()
db.define_table('tt', Field('aa', 'integer', default=1))
self.assertEqual(db.tt.insert(aa=3), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
db.tt.drop()
db.define_table('tt', Field('aa', 'string'))
ucs = 'A\xc3\xa9 A'
self.assertEqual(db.tt.insert(aa=ucs), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, ucs)
self.assertEqual(db().select(db.tt.aa.with_alias('zz'))[0].zz, ucs)
db.tt.drop()
db.define_table('tt', Field('aa', 'double', default=1))
self.assertEqual(db.tt.insert(aa=3.1), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
db.tt.drop()
db.define_table('tt', Field('aa', 'boolean', default=True))
self.assertEqual(db.tt.insert(aa=True), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
db.tt.drop()
db.define_table('tt', Field('aa', 'json', default={}))
# test different python objects for correct serialization in json
objs = [
{'a' : 1, 'b' : 2},
[1, 2, 3],
'abc',
True,
False,
None,
11,
14.3,
long(11)
]
for obj in objs:
rtn_id = db.tt.insert(aa=obj)
rtn = db(db.tt.id == rtn_id).select().first().aa
self.assertEqual(obj, rtn)
db.tt.drop()
db.define_table('tt', Field('aa', 'date',
default=datetime.date.today()))
t0 = datetime.date.today()
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'datetime',
default=datetime.datetime.today()))
t0 = datetime.datetime(
1971,
12,
21,
10,
30,
55,
0,
)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
## Row APIs
row = db().select(db.tt.aa)[0]
self.assertEqual(db.tt[1].aa,t0)
self.assertEqual(db.tt['aa'],db.tt.aa)
self.assertEqual(db.tt(1).aa,t0)
self.assertTrue(db.tt(1,aa=None)==None)
self.assertFalse(db.tt(1,aa=t0)==None)
self.assertEqual(row.aa,t0)
self.assertEqual(row['aa'],t0)
self.assertEqual(row['tt.aa'],t0)
self.assertEqual(row('tt.aa'),t0)
## Lazy and Virtual fields
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
row = db().select(db.tt.aa)[0]
self.assertEqual(row.b,t0)
self.assertEqual(row.c(),t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'time', default='11:30'))
t0 = datetime.time(10, 30, 55)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
# aggregation type detection
db.define_table('tt', Field('aa', 'datetime',
default=datetime.datetime.today()))
t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa.min())[0][db.tt.aa.min()], t0)
db.tt.drop()
class TestTables(unittest.TestCase):
def testTableNames(self):
"""
- a "str" something
- not a method or property of DAL
- "dotted-notation" friendly:
- a valid python identifier
- not a python keyword
- not starting with underscore or an integer
- not containing dots
Basically, anything alphanumeric, no symbols, only underscore as
punctuation
"""
# Check that Tables cannot start with underscores
self.assertRaises(SyntaxError, Table, None, '_abc')
# Check that Tables cannot contain punctuation other than underscores
self.assertRaises(SyntaxError, Table, None, 'a.bc')
# Check that Tables cannot be a name of a method or property of DAL
for x in ['define_table', 'tables', 'as_dict']:
self.assertRaises(SyntaxError, Table, None, x)
# Check that Table allows underscores in the body of a field name.
self.assertTrue(Table(None, 'a_bc'),
"Table isn't allowing underscores in tablename. It should.")
# Check that Table names don't allow a python keyword
self.assertRaises(SyntaxError, Table, None, 'True')
self.assertRaises(SyntaxError, Table, None, 'elif')
self.assertRaises(SyntaxError, Table, None, 'while')
# Check that Table names don't allow a non-valid python identifier
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
for a in non_valid_examples:
self.assertRaises(SyntaxError, Table, None, a)
# Check that Table names don't allow a unicode string
non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
for a in non_valid_examples:
self.assertRaises(SyntaxError, Table, None, a)
class TestAll(unittest.TestCase):
def setUp(self):
self.pt = Table(None,'PseudoTable',Field('name'),Field('birthdate'))
def testSQLALL(self):
ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate'
self.assertEqual(str(SQLALL(self.pt)), ans)
class TestTable(DALtest):
def testTableCreation(self):
# Check for error when not passing type other than Field or Table
self.assertRaises(SyntaxError, Table, None, 'test', None)
persons = Table(None, 'persons',
Field('firstname','string'),
Field('lastname', 'string'))
# Does it have the correct fields?
self.assertTrue(set(persons.fields).issuperset(set(['firstname',
'lastname'])))
# ALL is set correctly
self.assertTrue('persons.firstname, persons.lastname'
in str(persons.ALL))
def testTableAlias(self):
db = self.connect()
persons = Table(db, 'persons', Field('firstname',
'string'), Field('lastname', 'string'))
aliens = persons.with_alias('aliens')
# Are the different table instances with the same fields
self.assertTrue(persons is not aliens)
self.assertTrue(set(persons.fields) == set(aliens.fields))
def testTableInheritance(self):
persons = Table(None, 'persons', Field('firstname',
'string'), Field('lastname', 'string'))
customers = Table(None, 'customers',
Field('items_purchased', 'integer'),
persons)
self.assertTrue(set(customers.fields).issuperset(set(
['items_purchased', 'firstname', 'lastname'])))
class TestInsert(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa'))
self.assertEqual(db.tt.insert(aa='1'), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa='1'), 2)
self.assertEqual(db.tt.insert(aa='1'), 3)
else:
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db(db.tt.aa == '1').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
self.assertEqual(db(db.tt.aa == '2').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
class TestSelect(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa'))
self.assertEqual(db.tt.insert(aa='1'), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa='2'), 2)
self.assertEqual(db.tt.insert(aa='3'), 3)
else:
self.assertEqual(db.tt.insert(aa='2'), 1)
self.assertEqual(db.tt.insert(aa='3'), 1)
self.assertEqual(db(db.tt.id > 0).count(), 3)
self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa
| db.tt.id)[0].aa, '3')
self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1)
self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, '2')
self.assertEqual(len(db().select(db.tt.ALL)), 3)
self.assertEqual(db(db.tt.aa == None).count(), 0)
self.assertEqual(db(db.tt.aa != None).count(), 3)
self.assertEqual(db(db.tt.aa > '1').count(), 2)
self.assertEqual(db(db.tt.aa >= '1').count(), 3)
self.assertEqual(db(db.tt.aa == '1').count(), 1)
self.assertEqual(db(db.tt.aa != '1').count(), 2)
self.assertEqual(db(db.tt.aa < '3').count(), 2)
self.assertEqual(db(db.tt.aa <= '3').count(), 3)
self.assertEqual(db(db.tt.aa > '1')(db.tt.aa < '3').count(), 1)
self.assertEqual(db((db.tt.aa > '1') & (db.tt.aa < '3')).count(), 1)
self.assertEqual(db((db.tt.aa > '1') | (db.tt.aa < '3')).count(), 3)
self.assertEqual(db((db.tt.aa > '1') & ~(db.tt.aa > '2')).count(), 1)
self.assertEqual(db(~(db.tt.aa > '1') & (db.tt.aa > '2')).count(), 0)
# Test for REGEX_TABLE_DOT_FIELD
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa], '1')
def testTestQuery(self):
db = self.connect()
db._adapter.test_connection()
def testListInteger(self):
db = self.connect()
db.define_table('tt',
Field('aa', 'list:integer'))
l=[1,2,3,4,5]
db.tt.insert(aa=l)
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
def testListString(self):
db = self.connect()
db.define_table('tt',
Field('aa', 'list:string'))
l=['a', 'b', 'c']
db.tt.insert(aa=l)
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
def testListReference(self):
db = self.connect()
db.define_table('t0', Field('aa', 'string'))
db.define_table('tt', Field('t0_id', 'list:reference t0'))
id_a1=db.t0.insert(aa='test1')
id_a2=db.t0.insert(aa='test2')
ref1=[id_a1]
ref2=[id_a2]
ref3=[id_a1, id_a2]
db.tt.insert(t0_id=ref1)
self.assertEqual(
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
db.tt.insert(t0_id=ref2)
self.assertEqual(
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
db.tt.insert(t0_id=ref3)
self.assertEqual(
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
def testGroupByAndDistinct(self):
db = self.connect()
db.define_table('tt',
Field('aa'),
Field('bb', 'integer'),
Field('cc', 'integer'))
db.tt.insert(aa='4', bb=1, cc=1)
db.tt.insert(aa='3', bb=2, cc=1)
db.tt.insert(aa='3', bb=1, cc=1)
db.tt.insert(aa='1', bb=1, cc=1)
db.tt.insert(aa='1', bb=2, cc=1)
db.tt.insert(aa='1', bb=3, cc=1)
db.tt.insert(aa='1', bb=4, cc=1)
db.tt.insert(aa='2', bb=1, cc=1)
db.tt.insert(aa='2', bb=2, cc=1)
db.tt.insert(aa='2', bb=3, cc=1)
self.assertEqual(db(db.tt).count(), 10)
# test groupby
result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa)
self.assertEqual(len(result), 4)
result = db().select(db.tt.aa, db.tt.bb.sum(),
groupby=db.tt.aa, orderby=db.tt.aa)
self.assertEqual(tuple(result.response[2]), ('3', 3))
result = db().select(db.tt.aa, db.tt.bb.sum(),
groupby=db.tt.aa, orderby=~db.tt.aa)
self.assertEqual(tuple(result.response[1]), ('3', 3))
result = db().select(db.tt.aa, db.tt.bb, db.tt.cc.sum(),
groupby=db.tt.aa|db.tt.bb,
orderby=(db.tt.aa|~db.tt.bb))
self.assertEqual(tuple(result.response[4]), ('2', 3, 1))
result = db().select(db.tt.aa, db.tt.bb.sum(),
groupby=db.tt.aa, orderby=~db.tt.aa, limitby=(1,2))
self.assertEqual(len(result), 1)
self.assertEqual(tuple(result.response[0]), ('3', 3))
result = db().select(db.tt.aa, db.tt.bb.sum(),
groupby=db.tt.aa, limitby=(0,3))
self.assertEqual(len(result), 3)
self.assertEqual(tuple(result.response[2]), ('3', 3))
# test having
self.assertEqual(len(db().select(db.tt.aa, db.tt.bb.sum(),
groupby=db.tt.aa, having=db.tt.bb.sum() > 2)), 3)
# test distinct
result = db().select(db.tt.aa, db.tt.cc, distinct=True)
self.assertEqual(len(result), 4)
result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].cc, 1)
result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa)
self.assertEqual(result[2].aa, '2')
self.assertEqual(result[1].aa, '3')
result = db().select(db.tt.aa, db.tt.bb,
distinct=True, orderby=(db.tt.aa|~db.tt.bb))
self.assertEqual(tuple(result.response[4]), ('2', 3))
result = db().select(db.tt.aa,
distinct=True, orderby=~db.tt.aa, limitby=(1,2))
self.assertEqual(len(result), 1)
self.assertEqual(result[0].aa, '3')
# test count distinct
db.tt.insert(aa='2', bb=3, cc=1)
self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4)
self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4)
self.assertEqual(db(db.tt.aa).count(), 11)
count=db.tt.aa.count()
self.assertEqual(db(db.tt).select(count).first()[count], 11)
count=db.tt.aa.count(distinct=True)
sum=db.tt.bb.sum()
result = db(db.tt).select(count, sum)
self.assertEqual(tuple(result.response[0]), (4, 23))
def testCoalesce(self):
db = self.connect()
db.define_table('tt', Field('aa'), Field('bb'), Field('cc'), Field('dd'))
db.tt.insert(aa='xx')
db.tt.insert(aa='xx', bb='yy')
db.tt.insert(aa='xx', bb='yy', cc='zz')
db.tt.insert(aa='xx', bb='yy', cc='zz', dd='')
result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa))
self.assertEqual(result.response[0][0], 'xx')
self.assertEqual(result.response[1][0], 'yy')
self.assertEqual(result.response[2][0], 'zz')
self.assertEqual(result.response[3][0], '')
db.tt.drop()
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
db.tt.insert(bb='')
db.tt.insert(aa=1)
result = db(db.tt).select(db.tt.aa.coalesce_zero())
self.assertEqual(result.response[0][0], 0)
self.assertEqual(result.response[1][0], 1)
def testTableAliasCollisions(self):
db = self.connect()
db.define_table('t1', Field('aa'))
db.define_table('t2', Field('bb'))
t1, t2 = db.t1, db.t2
t1.with_alias('t2')
t2.with_alias('t1')
# Passing tables by name will result in exception
t1.insert(aa='test')
t2.insert(bb='foo')
db(t1.id > 0).update(aa='bar')
having = (t1.aa != None)
join = [t2.on(t1.aa == t2.bb)]
db(t1.aa == t2.bb).select(t1.aa, groupby=t1.aa, having=having,
orderby=t1.aa)
db(t1.aa).select(t1.aa, join=join, groupby=t1.aa, having=having,
orderby=t1.aa)
db(t1.aa).select(t1.aa, left=join, groupby=t1.aa, having=having,
orderby=t1.aa)
db(t1.id > 0).delete()
class TestSubselect(DALtest):
def testMethods(self):
db = self.connect()
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
data = [
dict(aa=1, bb='foo'), dict(aa=1, bb='bar'), dict(aa=2, bb='foo')
]
for item in data:
db.tt.insert(**item)
fields = [db.tt.aa, db.tt.bb, db.tt.aa+2,
(db.tt.aa+1).with_alias('exp')]
sub = db(db.tt).nested_select(*fields, orderby=db.tt.id)
# Check the fields provided by the object
self.assertEqual(sorted(['aa', 'bb', 'exp']), sorted(list(sub.fields)))
for name in sub.fields:
self.assertIsInstance(sub[name], Field)
for item in sub:
self.assertIsInstance(item, Field)
self.assertEqual(len(list(sub)), len(sub.fields))
for key, val in zip(sub.fields, sub):
self.assertIs(sub[key], val)
self.assertIs(getattr(sub, key), val)
tmp = sub._filter_fields(dict(aa=1, exp=2, foo=3))
self.assertEqual(tmp, dict(aa=1, exp=2))
# Check result from executing the query
result = sub()
self.assertEqual(len(result), len(data))
for idx, row in enumerate(data):
self.assertEqual(result[idx]['tt'].as_dict(), row)
self.assertEqual(result[idx]['exp'], row['aa']+1)
result = db.executesql(str(sub))
for idx, row in enumerate(data):
tmp = [row['aa'], row['bb'], row['aa']+2, row['aa']+1]
self.assertEqual(list(result[idx]), tmp)
# Check that query expansion methods don't work without alias
self.assertEqual(sub._rname, None)
self.assertEqual(sub._raw_rname, None)
self.assertEqual(sub._dalname, None)
with self.assertRaises(SyntaxError):
sub.query_name()
with self.assertRaises(SyntaxError):
sub.sql_shortref
with self.assertRaises(SyntaxError):
sub.on(sub.aa != None)
# Alias checks
sub = sub.with_alias('foo')
result = sub()
for idx, row in enumerate(data):
self.assertEqual(result[idx]['tt'].as_dict(), row)
self.assertEqual(result[idx]['exp'], row['aa']+1)
# Check query expansion methods again
self.assertEqual(sub._rname, None)
self.assertEqual(sub._raw_rname, None)
self.assertEqual(sub._dalname, None)
self.assertEqual(sub.query_name()[0], str(sub))
self.assertEqual(sub.sql_shortref, db._adapter.dialect.quote('foo'))
self.assertIsInstance(sub.on(sub.aa != None), Expression)
def testSelectArguments(self):
db = self.connect()
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
data = [
dict(aa=1, bb='foo'), dict(aa=1, bb='bar'), dict(aa=2, bb='foo'),
dict(aa=3, bb='foo'), dict(aa=3, bb='baz')
]
expected = [(1, None, 0), (2, 2, 2), (2, 2, 2), (3, 4, 3), (3, 8, 6)]
for item in data:
db.tt.insert(**item)
# Check that select clauses work as expected in stand-alone query
t1 = db.tt.with_alias('t1')
t2 = db.tt.with_alias('t2')
fields = [t1.aa, t2.aa.sum().with_alias('total'),
t2.aa.count().with_alias('cnt')]
join = t1.on(db.tt.bb != t1.bb)
left = t2.on(t1.aa > t2.aa)
group = db.tt.bb | t1.aa
having = db.tt.aa.count() > 1
order = t1.aa | t2.aa.count()
limit = (1,6)
sub = db(db.tt.aa != 2).nested_select(*fields, join=join, left=left,
orderby=order, groupby=group, having=having, limitby=limit)
result = sub()
self.assertEqual(len(result), len(expected))
for idx, val in enumerate(expected):
self.assertEqual(result[idx]['t1']['aa'], val[0])
self.assertEqual(result[idx]['total'], val[1])
self.assertEqual(result[idx]['cnt'], val[2])
# Check again when nested inside another query
# Also check that the alias will not conflict with existing table
t3 = db.tt.with_alias('t3')
sub = sub.with_alias('tt')
query = (t3.bb == 'foo') & (t3.aa == sub.aa)
order = t3.aa | sub.cnt
result = db(query).select(t3.aa, sub.total, sub.cnt, orderby=order)
for idx, val in enumerate(expected):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['tt']['total'], val[1])
self.assertEqual(result[idx]['tt']['cnt'], val[2])
# Check "distinct" modifier separately
sub = db(db.tt.aa != 2).nested_select(db.tt.aa, distinct=True)
result = sub().as_list()
self.assertEqual(result, [dict(aa=1), dict(aa=3)])
def testCorrelated(self):
db = self.connect()
db.define_table('t1', Field('aa', 'integer'), Field('bb'),
Field('mark', 'integer'))
db.define_table('t2', Field('aa', 'integer'), Field('cc'))
db.define_table('t3', Field('aa', 'integer'))
data_t1 = [
dict(aa=1, bb='bar'), dict(aa=1, bb='foo'), dict(aa=2, bb='foo'),
dict(aa=2, bb='test'), dict(aa=3, bb='baz'), dict(aa=3, bb='foo')
]
data_t2 = [
dict(aa=1, cc='foo'), dict(aa=2, cc='bar'), dict(aa=3, cc='baz')
]
expected_cor = [(1, 'foo'), (3, 'baz')]
expected_leftcor = [(1, 'foo'), (2, None), (3, 'baz')]
expected_uncor = [
(1, 'bar'), (1, 'foo'), (2, 'foo'), (3, 'baz'), (3, 'foo')
]
for item in data_t1:
db.t1.insert(**item)
for item in data_t2:
db.t2.insert(**item)
db.t3.insert(aa=item['aa'])
# Correlated subqueries
subquery = db.t1.aa == db.t2.aa
subfields = [db.t2.cc]
sub = db(subquery).nested_select(*subfields).with_alias('sub')
query = db.t1.bb.belongs(sub)
order = db.t1.aa | db.t1.bb
result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
self.assertEqual(len(result), len(expected_cor))
for idx, val in enumerate(expected_cor):
self.assertEqual(result[idx]['aa'], val[0])
self.assertEqual(result[idx]['bb'], val[1])
join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
order = db.t3.aa | db.t1.bb
result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
self.assertEqual(len(result), len(expected_cor))
for idx, val in enumerate(expected_cor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['t1']['bb'], val[1])
left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
self.assertEqual(len(result), len(expected_leftcor))
for idx, val in enumerate(expected_leftcor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['t1']['bb'], val[1])
order = db.t1.aa | db.t1.bb
db(db.t1.bb.belongs(sub)).update(mark=1)
result = db(db.t1.mark == 1).select(db.t1.aa, db.t1.bb, orderby=order)
self.assertEqual(len(result), len(expected_cor))
for idx, val in enumerate(expected_cor):
self.assertEqual(result[idx]['aa'], val[0])
self.assertEqual(result[idx]['bb'], val[1])
db(~db.t1.bb.belongs(sub)).delete()
result = db(db.t1.id > 0).select(db.t1.aa, db.t1.bb, orderby=order)
self.assertEqual(len(result), len(expected_cor))
for idx, val in enumerate(expected_cor):
self.assertEqual(result[idx]['aa'], val[0])
self.assertEqual(result[idx]['bb'], val[1])
db(db.t1.id > 0).delete()
for item in data_t1:
db.t1.insert(**item)
# Uncorrelated subqueries
kwargs = dict(correlated=False)
sub = db(subquery).nested_select(*subfields, **kwargs)
query = db.t1.bb.belongs(sub)
order = db.t1.aa | db.t1.bb
result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['aa'], val[0])
self.assertEqual(result[idx]['bb'], val[1])
join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
order = db.t3.aa | db.t1.bb
result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['t1']['bb'], val[1])
left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['t1']['bb'], val[1])
# MySQL does not support subqueries with uncorrelated references
# to target table
# Correlation prevented by alias in parent select
tmp = db.t1.with_alias('tmp')
sub = db(subquery).nested_select(*subfields)
query = tmp.bb.belongs(sub)
order = tmp.aa | tmp.bb
result = db(query).select(tmp.aa, tmp.bb, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['aa'], val[0])
self.assertEqual(result[idx]['bb'], val[1])
join = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
order = db.t3.aa | tmp.bb
result = db(db.t3).select(db.t3.aa, tmp.bb, join=join, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['tmp']['bb'], val[1])
left = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
result = db(db.t3).select(db.t3.aa, tmp.bb, left=left, orderby=order)
self.assertEqual(len(result), len(expected_uncor))
for idx, val in enumerate(expected_uncor):
self.assertEqual(result[idx]['t3']['aa'], val[0])
self.assertEqual(result[idx]['tmp']['bb'], val[1])
# SQLite does not support aliasing target table in UPDATE/DELETE
# MySQL does not support subqueries with uncorrelated references
# to target table
class TestAddMethod(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa'))
@db.tt.add_method.all
def select_all(table,orderby=None):
return table._db(table).select(orderby=orderby)
self.assertEqual(db.tt.insert(aa='1'), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa='1'), 2)
self.assertEqual(db.tt.insert(aa='1'), 3)
else:
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(len(db.tt.all()), 3)
class TestBelongs(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa'))
self.assertEqual(db.tt.insert(aa='1'), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa='2'), 2)
self.assertEqual(db.tt.insert(aa='3'), 3)
else:
self.assertEqual(db.tt.insert(aa='2'), 1)
self.assertEqual(db.tt.insert(aa='3'), 1)
self.assertEqual(db(db.tt.aa.belongs(('1', '3'))).count(),
2)
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.id
> 2)._select(db.tt.aa))).count(), 1)
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(('1',
'3')))._select(db.tt.aa))).count(), 2)
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(db
(db.tt.aa.belongs(('1', '3')))._select(db.tt.aa)))._select(
db.tt.aa))).count(),
2)
class TestContains(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'))
self.assertEqual(db.tt.insert(aa=['aaa','bbb'],bb='aaa'), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa=['bbb','ddd'],bb='abb'), 2)
self.assertEqual(db.tt.insert(aa=['eee','aaa'],bb='acc'), 3)
else:
self.assertEqual(db.tt.insert(aa=['bbb','ddd'],bb='abb'), 1)
self.assertEqual(db.tt.insert(aa=['eee','aaa'],bb='acc'), 1)
self.assertEqual(db(db.tt.aa.contains('aaa')).count(), 2)
self.assertEqual(db(db.tt.aa.contains('bbb')).count(), 2)
self.assertEqual(db(db.tt.aa.contains('aa')).count(), 0)
self.assertEqual(db(db.tt.bb.contains('a')).count(), 3)
self.assertEqual(db(db.tt.bb.contains('b')).count(), 1)
self.assertEqual(db(db.tt.bb.contains('d')).count(), 0)
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
#case-sensitivity tests, if 1 it isn't
is_case_insensitive = db(db.tt.bb.like('%AA%')).count()
if is_case_insensitive:
self.assertEqual(db(db.tt.aa.contains('AAA')).count(), 2)
self.assertEqual(db(db.tt.bb.contains('A')).count(), 3)
else:
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=True)).count(), 0)
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=True)).count(), 0)
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=False)).count(), 2)
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=False)).count(), 3)
db.tt.drop()
# integers in string fields
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'), Field('cc','integer'))
self.assertEqual(db.tt.insert(aa=['123','456'],bb='123', cc=12), 1)
if not IS_TERADATA:
self.assertEqual(db.tt.insert(aa=['124','456'],bb='123', cc=123), 2)
self.assertEqual(db.tt.insert(aa=['125','457'],bb='23', cc=125), 3)
else:
self.assertEqual(db.tt.insert(aa=['124','456'],bb='123', cc=123), 1)
self.assertEqual(db.tt.insert(aa=['125','457'],bb='23', cc=125), 1)
self.assertEqual(db(db.tt.aa.contains(123)).count(), 1)
self.assertEqual(db(db.tt.aa.contains(23)).count(), 0)
self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1)
self.assertEqual(db(db.tt.bb.contains(123)).count(), 2)
self.assertEqual(db(db.tt.bb.contains(23)).count(), 3)
self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2)
db.tt.drop()
# string field contains string field
db.define_table('tt', Field('aa'), Field('bb'))
db.tt.insert(aa='aaa', bb='%aaa')
db.tt.insert(aa='aaa', bb='aaa')
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
db.tt.drop()
#escaping
db.define_table('tt', Field('aa'))
db.tt.insert(aa='perc%ent')
db.tt.insert(aa='percent')
db.tt.insert(aa='percxyzent')
db.tt.insert(aa='under_score')
db.tt.insert(aa='underxscore')
db.tt.insert(aa='underyscore')
self.assertEqual(db(db.tt.aa.contains('perc%ent')).count(), 1)
self.assertEqual(db(db.tt.aa.contains('under_score')).count(), 1)
class TestLike(DALtest):
def setUp(self):
db = self.connect()
db.define_table('tt', Field('aa'))
self.assertEqual(isinstance(db.tt.insert(aa='abc'), long), True)
self.db = db
def testRun(self):
db = self.db
self.assertEqual(db(db.tt.aa.like('a%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%b%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%c')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%d%')).count(), 0)
self.assertEqual(db(db.tt.aa.like('ab_')).count(), 1)
self.assertEqual(db(db.tt.aa.like('a_c')).count(), 1)
self.assertEqual(db(db.tt.aa.like('_bc')).count(), 1)
self.assertEqual(db(db.tt.aa.like('A%', case_sensitive=False)).count(), 1)
self.assertEqual(db(db.tt.aa.like('%B%', case_sensitive=False)).count(), 1)
self.assertEqual(db(db.tt.aa.like('%C', case_sensitive=False)).count(), 1)
self.assertEqual(db(db.tt.aa.ilike('A%')).count(), 1)
self.assertEqual(db(db.tt.aa.ilike('%B%')).count(), 1)
self.assertEqual(db(db.tt.aa.ilike('%C')).count(), 1)
#DAL maps like() (and contains(), startswith(), endswith())
#to the LIKE operator, that in ANSI-SQL is case-sensitive
#There are backends supporting case-sensitivity by default
#and backends that needs additional care to turn
#case-sensitivity on. To discern among those, let's run
#this query comparing previously inserted 'abc' with 'ABC':
#if the result is 0, then the backend recognizes
#case-sensitivity, if 1 it isn't
is_case_insensitive = db(db.tt.aa.like('%ABC%')).count()
self.assertEqual(db(db.tt.aa.like('A%')).count(), is_case_insensitive)
self.assertEqual(db(db.tt.aa.like('%B%')).count(), is_case_insensitive)
self.assertEqual(db(db.tt.aa.like('%C')).count(), is_case_insensitive)
def testUpperLower(self):
db = self.db
self.assertEqual(db(db.tt.aa.upper().like('A%')).count(), 1)
self.assertEqual(db(db.tt.aa.upper().like('%B%')).count(),1)
self.assertEqual(db(db.tt.aa.upper().like('%C')).count(), 1)
self.assertEqual(db(db.tt.aa.lower().like('%c')).count(), 1)
def testStartsEndsWith(self):
db = self.db
self.assertEqual(db(db.tt.aa.startswith('a')).count(), 1)
self.assertEqual(db(db.tt.aa.endswith('c')).count(), 1)
self.assertEqual(db(db.tt.aa.startswith('c')).count(), 0)
self.assertEqual(db(db.tt.aa.endswith('a')).count(), 0)
def testEscaping(self):
db = self.db
term = 'ahbc'.replace('h', '\\') #funny but to avoid any doubts...
db.tt.insert(aa='a%bc')
db.tt.insert(aa='a_bc')
db.tt.insert(aa=term)
self.assertEqual(db(db.tt.aa.like('%ax%bc%', escape='x')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%ax_bc%', escape='x')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%'+term+'%')).count(), 1)
db(db.tt.id>0).delete()
# test "literal" like, i.e. exactly as LIKE in the backend
db.tt.insert(aa='perc%ent')
db.tt.insert(aa='percent')
db.tt.insert(aa='percxyzent')
db.tt.insert(aa='under_score')
db.tt.insert(aa='underxscore')
db.tt.insert(aa='underyscore')
self.assertEqual(db(db.tt.aa.like('%perc%ent%')).count(), 3)
self.assertEqual(db(db.tt.aa.like('%under_score%')).count(), 3)
db(db.tt.id>0).delete()
# escaping with startswith and endswith
db.tt.insert(aa='%percent')
db.tt.insert(aa='xpercent')
db.tt.insert(aa='discount%')
db.tt.insert(aa='discountx')
self.assertEqual(db(db.tt.aa.endswith('discount%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('discount%%')).count(), 2)
self.assertEqual(db(db.tt.aa.startswith('%percent')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%%percent')).count(), 2)
@unittest.skipIf(IS_MSSQL, "No Regexp on MSSQL")
def testRegexp(self):
db = self.db
db(db.tt.id>0).delete()
db.tt.insert(aa='%percent')
db.tt.insert(aa='xpercent')
db.tt.insert(aa='discount%')
db.tt.insert(aa='discountx')
try:
self.assertEqual(db(db.tt.aa.regexp('count')).count(), 2)
except NotImplementedError:
pass
else:
self.assertEqual(db(db.tt.aa.lower().regexp('count')).count(), 2)
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') &
db.tt.aa.lower().regexp('count')).count(), 2)
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') |
(db.tt.aa.lower()=='xpercent')).count(), 3)
def testLikeInteger(self):
db = self.db
db.tt.drop()
db.define_table('tt', Field('aa', 'integer'))
self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True)
self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True)
self.assertEqual(db(db.tt.aa.like('1%')).count(), 2)
self.assertEqual(db(db.tt.aa.like('1_3%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('2%')).count(), 0)
self.assertEqual(db(db.tt.aa.like('_2%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('12%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('012%')).count(), 0)
self.assertEqual(db(db.tt.aa.like('%45%')).count(), 1)
self.assertEqual(db(db.tt.aa.like('%54%')).count(), 0)
class TestDatetime(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa', 'datetime'))
self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 12, 21,
11, 30)), 1)
self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 11, 21,
10, 30)), 2)
self.assertEqual(db.tt.insert(aa=datetime.datetime(1970, 12, 21,
9, 31)), 3)
self.assertEqual(db(db.tt.aa == datetime.datetime(1971, 12,
21, 11, 30)).count(), 1)
self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2)
self.assertEqual(db(db.tt.aa.month() > 11).count(), 2)
self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3)
self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1)
self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2)
self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3)
self.assertEqual(db(db.tt.aa.epoch() < 365*24*3600).delete(), 1)
db.tt.drop()
db.define_table('tt', Field('aa', 'time'))
t0 = datetime.time(10, 30, 55)
db.tt.insert(aa=t0)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'date'))
t0 = datetime.date.today()
db.tt.insert(aa=t0)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
class TestExpressions(DALtest):
@unittest.skipIf(IS_POSTGRESQL, "PG8000 does not like these")
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa', 'integer'),
Field('bb', 'integer'), Field('cc'))
self.assertEqual(db.tt.insert(aa=1, bb=0), 1)
self.assertEqual(db.tt.insert(aa=2, bb=0), 2)
self.assertEqual(db.tt.insert(aa=3, bb=0), 3)
# test update
self.assertEqual(db(db.tt.aa == 3).update(aa=db.tt.aa + 1,
bb=db.tt.bb + 2), 1)
self.assertEqual(db(db.tt.aa == 4).count(), 1)
self.assertEqual(db(db.tt.bb == 2).count(), 1)
self.assertEqual(db(db.tt.aa == -2).count(), 0)
self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1)
self.assertEqual(db(db.tt.bb == 5).count(), 1)
self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1)
self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2,
cc='cc'), 1)
self.assertEqual(db(db.tt.cc == 'cc').count(), 1)
self.assertEqual(db(db.tt.aa == 6).count(), 1)
self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa *
(db.tt.bb - 3)), 1)
self.assertEqual(db(db.tt.bb == 12).count(), 1)
self.assertEqual(db(db.tt.aa == 6).count(), 1)
self.assertEqual(db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1,
cc=db.tt.cc + '1' +'1'), 1)
self.assertEqual(db(db.tt.cc == 'cc11').count(), 1)
self.assertEqual(db(db.tt.aa == 3).count(), 1)
# test comparsion expression based count
self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0)
self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3)
# test select aggregations
sum = (db.tt.aa + 1).sum()
self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7)
self.assertEqual(db((1==0) & (db.tt.aa >= db.tt.aa)).count(), 0)
self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None)
count=db.tt.aa.count()
avg=db.tt.aa.avg()
min=db.tt.aa.min()
max=db.tt.aa.max()
result = db(db.tt).select(sum, count, avg, min, max).first()
self.assertEqual(result[sum], 9)
self.assertEqual(result[count], 3)
self.assertEqual(result[avg], 2)
self.assertEqual(result[min], 1)
self.assertEqual(result[max], 3)
# Test basic expressions evaluated at python level
self.assertEqual(db((1==1) & (db.tt.aa >= 2)).count(), 2)
self.assertEqual(db((1==1) | (db.tt.aa >= 2)).count(), 3)
self.assertEqual(db((1==0) & (db.tt.aa >= 2)).count(), 0)
self.assertEqual(db((1==0) | (db.tt.aa >= 2)).count(), 2)
# test abs()
self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa*-10), 1)
abs=db.tt.aa.abs().with_alias('abs')
result = db(db.tt.aa == -20).select(abs).first()
self.assertEqual(result[abs], 20)
self.assertEqual(result['abs'], 20)
abs=db.tt.aa.abs()/10+5
exp=abs.min()*2+1
result = db(db.tt.aa == -20).select(exp).first()
self.assertEqual(result[exp], 15)
# test case()
condition = db.tt.aa > 2
case = condition.case(db.tt.aa + 2, db.tt.aa - 2)
my_case = case.with_alias('my_case')
result = db().select(my_case)
self.assertEqual(len(result), 3)
self.assertEqual(result[0][my_case], -1)
self.assertEqual(result[0]['my_case'], -1)
self.assertEqual(result[1]['my_case'], -22)
self.assertEqual(result[2]['my_case'], 5)
# test expression based delete
self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1)
self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1)
self.assertEqual(db(db.tt.aa).count(), 2)
def testUpdate(self):
db = self.connect()
# some db's only support seconds
datetime_datetime_today = datetime.datetime.today()
datetime_datetime_today = datetime_datetime_today.replace(
microsecond = 0)
one_day = datetime.timedelta(1)
one_sec = datetime.timedelta(0,1)
update_vals = (
('string', 'x', 'y'),
('text', 'x', 'y'),
('password', 'x', 'y'),
('integer', 1, 2),
('bigint', 1, 2),
('float', 1.0, 2.0),
('double', 1.0, 2.0),
('boolean', True, False),
('date', datetime.date.today(), datetime.date.today() + one_day),
('datetime', datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
datetime_datetime_today),
('time', datetime_datetime_today.time(),
(datetime_datetime_today + one_sec).time()),
)
for uv in update_vals:
db.define_table('tt', Field('aa', 'integer', default=0),
Field('bb', uv[0]))
self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long))
self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1])
self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1)
self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2])
db.tt.drop()
def testSubstring(self):
db = self.connect()
t0 = db.define_table('t0', Field('name'))
input_name = "web2py"
t0.insert(name=input_name)
exp_slice = t0.name.lower()[4:6]
exp_slice_no_max = t0.name.lower()[4:]
exp_slice_neg_max = t0.name.lower()[2:-2]
exp_slice_neg_start = t0.name.lower()[-2:]
exp_item = t0.name.lower()[3]
out = db(t0).select(exp_slice, exp_item, exp_slice_no_max, exp_slice_neg_max, exp_slice_neg_start).first()
self.assertEqual(out[exp_slice], input_name[4:6])
self.assertEqual(out[exp_item], input_name[3])
self.assertEqual(out[exp_slice_no_max], input_name[4:])
self.assertEqual(out[exp_slice_neg_max], input_name[2:-2])
self.assertEqual(out[exp_slice_neg_start], input_name[-2:])
def testOps(self):
db = self.connect()
t0 = db.define_table('t0', Field('vv', 'integer'))
self.assertEqual(db.t0.insert(vv=1), 1)
self.assertEqual(db.t0.insert(vv=2), 2)
self.assertEqual(db.t0.insert(vv=3), 3)
sum = db.t0.vv.sum()
count=db.t0.vv.count()
avg=db.t0.vv.avg()
op = sum/count
op1 = (sum/count).with_alias('tot')
self.assertEqual(db(t0).select(op).first()[op], 2)
self.assertEqual(db(t0).select(op1).first()[op1], 2)
self.assertEqual(db(t0).select(op1).first()['tot'], 2)
op2 = avg*count
self.assertEqual(db(t0).select(op2).first()[op2], 6)
# the following is not possible at least on sqlite
sum = db.t0.vv.sum().with_alias('s')
count=db.t0.vv.count().with_alias('c')
op = sum/count
#self.assertEqual(db(t0).select(op).first()[op], 2)
class TestTableAliasing(DALtest):
def testRun(self):
db = self.connect()
db.define_table('t1', Field('aa'))
db.define_table('t2',
Field('pk', type='id', unique=True, notnull=True),
Field('bb', type='integer'), rname='tt')
tab1 = db.t1.with_alias('test1')
tab2 = db.t2.with_alias('test2')
self.assertIs(tab2.id, tab2.pk)
self.assertIs(tab2._id, tab2.pk)
self.assertEqual(tab1._dalname, 't1')
self.assertEqual(tab1._tablename, 'test1')
self.assertEqual(tab2._dalname, 't2')
self.assertEqual(tab2._tablename, 'test2')
self.assertEqual(tab2._rname, 'tt')
tab1.insert(aa='foo')
tab1.insert(aa='bar')
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
self.assertEqual(result.as_list(), [{'aa': 'bar'}, {'aa': 'foo'}])
if not IS_SQLITE:
db(tab1.aa == 'foo').update(aa='baz')
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
self.assertEqual(result.as_list(), [{'aa': 'bar'}, {'aa': 'baz'}])
db(tab1.aa == 'bar').delete()
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
self.assertEqual(result.as_list(), [{'aa': 'baz'}])
else:
with self.assertRaises(SyntaxError):
db(tab1.aa == 'foo').update(aa='baz')
with self.assertRaises(SyntaxError):
db(tab1.aa == 'bar').delete()
tab2.insert(bb=123)
tab2.insert(bb=456)
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
self.assertEqual(result.as_list(), [{'bb': 123}, {'bb': 456}])
if not IS_SQLITE:
db(tab2.bb == 456).update(bb=789)
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
self.assertEqual(result.as_list(), [{'bb': 123}, {'bb': 789}])
db(tab2.bb == 123).delete()
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
self.assertEqual(result.as_list(), [{'bb': 789}])
else:
with self.assertRaises(SyntaxError):
db(tab2.bb == 456).update(bb=789)
with self.assertRaises(SyntaxError):
db(tab2.bb == 123).delete()
class TestJoin(DALtest):
def testRun(self):
db = self.connect()
db.define_table('t1', Field('aa'))
db.define_table('t2', Field('aa'), Field('b', db.t1))
i1 = db.t1.insert(aa='1')
i2 = db.t1.insert(aa='2')
i3 = db.t1.insert(aa='3')
db.t2.insert(aa='4', b=i1)
db.t2.insert(aa='5', b=i2)
db.t2.insert(aa='6', b=i2)
self.assertEqual(len(db(db.t1.id
== db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)), 3)
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t2.aa, '6')
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)), 4)
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa, groupby=db.t1.aa)),
3)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
1)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
2)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
0)
db.t2.drop()
db.t1.drop()
db.define_table('person',Field('name'))
id = db.person.insert(name="max")
self.assertEqual(id.name,'max')
db.define_table('dog',Field('name'),Field('ownerperson','reference person'))
db.dog.insert(name='skipper',ownerperson=1)
row = db(db.person.id==db.dog.ownerperson).select().first()
self.assertEqual(row[db.person.name],'max')
self.assertEqual(row['person.name'],'max')
db.dog.drop()
self.assertEqual(len(db.person._referenced_by),0)
class TestMinMaxSumAvg(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa', 'integer'))
self.assertEqual(db.tt.insert(aa=1), 1)
self.assertEqual(db.tt.insert(aa=2), 2)
self.assertEqual(db.tt.insert(aa=3), 3)
s = db.tt.aa.min()
self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1)
self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1)
self.assertEqual(db().select(s).first()[s], 1)
s = db.tt.aa.max()
self.assertEqual(db().select(s).first()[s], 3)
s = db.tt.aa.sum()
self.assertEqual(db().select(s).first()[s], 6)
s = db.tt.aa.count()
self.assertEqual(db().select(s).first()[s], 3)
s = db.tt.aa.avg()
self.assertEqual(db().select(s).first()[s], 2)
class TestMigrations(unittest.TestCase):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'), Field('BB'),
migrate='.storage.table')
db.define_table('t1', Field('aa'), Field('BB'),
migrate='.storage.rname', rname='foo')
db.commit()
db.close()
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'), migrate='.storage.table')
db.define_table('t1', Field('aa'), migrate='.storage.rname',
rname='foo')
db.commit()
db.close()
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'), Field('b'),
migrate='.storage.table')
db.define_table('t1', Field('aa'), Field('b'),
migrate='.storage.rname', rname='foo')
db.commit()
db.close()
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'), Field('b', 'text'),
migrate='.storage.table')
db.define_table('t1', Field('aa'), Field('b', 'text'),
migrate='.storage.rname', rname='foo')
db.commit()
db.close()
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'), migrate='.storage.table')
db.define_table('t1', Field('aa'), migrate='.storage.rname',
rname='foo')
db.tt.drop()
db.t1.drop()
db.commit()
db.close()
def testFieldRName(self):
def checkWrite(db, table, data):
rowid = table.insert(**data)
query = (table._id == rowid)
fields = [table[x] for x in data.keys()]
row = db(query).select(*fields).first()
self.assertIsNot(row, None)
self.assertEqual(row.as_dict(), data)
db(query).delete()
# Create tables
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'),
Field('BB', rname='fbb'), migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'),
Field('BB', rname='fbb'), migrate='.storage.rname', rname='foo')
data = dict(aa='aa1', BB='BB1')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
db.commit()
db.close()
# Drop field defined by CREATE TABLE
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'),
migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'),
migrate='.storage.rname', rname='foo')
data = dict(aa='aa2')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
db.commit()
db.close()
# Add new field
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'), Field('b', rname='fb'),
migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'), Field('b', rname='fb'),
migrate='.storage.rname', rname='foo')
data = dict(aa='aa3', b='b3')
integrity = dict(aa='data', b='integrity')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
db.tt.insert(**integrity)
db.t1.insert(**integrity)
db.commit()
db.close()
# Change field type
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'),
Field('b', 'text', rname='fb'), migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'),
Field('b', 'text', rname='fb'), migrate='.storage.rname',
rname='foo')
data = dict(aa='aa4', b='b4')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
self.assertIsNot(row, None)
self.assertEqual(row.as_dict(), integrity)
row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
self.assertIsNot(row2, None)
self.assertEqual(row2.as_dict(), integrity)
db.commit()
db.close()
# Change field rname
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'),
Field('b', 'text', rname='xb'), migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'),
Field('b', 'text', rname='xb'), migrate='.storage.rname',
rname='foo')
data = dict(aa='aa4', b='b4')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
self.assertIsNot(row, None)
self.assertEqual(row.as_dict(), integrity)
row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
self.assertIsNot(row2, None)
self.assertEqual(row2.as_dict(), integrity)
db.commit()
db.close()
# Drop field defined by ALTER TABLE
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', rname='faa'),
migrate='.storage.table')
db.define_table('t1', Field('aa', rname='faa'),
migrate='.storage.rname', rname='foo')
data = dict(aa='aa5')
checkWrite(db, db.tt, data)
checkWrite(db, db.t1, data)
db.tt.drop()
db.t1.drop()
db.commit()
db.close()
def tearDown(self):
if os.path.exists('.storage.db'):
os.unlink('.storage.db')
if os.path.exists('.storage.table'):
os.unlink('.storage.table')
if os.path.exists('.storage.rname'):
os.unlink('.storage.rname')
class TestReference(DALtest):
def testRun(self):
scenarios = (
(True, 'CASCADE'),
(False, 'CASCADE'),
(False, 'SET NULL'),
)
for (b, ondelete) in scenarios:
db = self.connect(bigint_id=b)
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
db.define_table('tt', Field('name'),
Field('aa','reference tt',ondelete=ondelete))
db.commit()
x = db.tt.insert(name='xxx')
self.assertEqual(x.id, 1)
self.assertEqual(x['id'], 1)
x.aa = x
self.assertEqual(x.aa, 1)
x.update_record()
x1 = db.tt[1]
self.assertEqual(x1.aa, 1)
self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, 'xxx')
y=db.tt.insert(name='yyy', aa = x1)
self.assertEqual(y.aa, x1.id)
if not DEFAULT_URI.startswith('mssql'):
self.assertEqual(db.tt.insert(name='zzz'), 3)
self.assertEqual(db(db.tt.name).count(), 3)
db(db.tt.id == x).delete()
expected_count = {
'SET NULL': 2,
'NO ACTION': 2,
'CASCADE': 1,
}
self.assertEqual(db(db.tt.name).count(), expected_count[ondelete])
if ondelete == 'SET NULL':
self.assertEqual(db(db.tt.name == 'yyy').select()[0].aa, None)
self.tearDown()
class TestClientLevelOps(DALtest):
def testRun(self):
db = self.connect()
db.define_table(
'tt',
Field('aa', represent=lambda x,r:'x'+x),
Field('bb', type='integer', represent=lambda x,r:'y'+str(x)))
db.commit()
db.tt.insert(aa="test", bb=1)
rows1 = db(db.tt.id<0).select()
rows2 = db(db.tt.id>0).select()
self.assertNotEqual(rows1, rows2)
rows1 = db(db.tt.id>0).select()
rows2 = db(db.tt.id>0).select()
self.assertEqual(rows1, rows2)
rows3 = rows1 + rows2
self.assertEqual(len(rows3), 2)
rows4 = rows1 | rows2
self.assertEqual(len(rows4), 1)
rows5 = rows1 & rows2
self.assertEqual(len(rows5), 1)
rows6 = rows1.find(lambda row: row.aa=="test")
self.assertEqual(len(rows6), 1)
rows7 = rows2.exclude(lambda row: row.aa=="test")
self.assertEqual(len(rows7), 1)
rows8 = rows5.sort(lambda row: row.aa)
self.assertEqual(len(rows8), 1)
def represent(f, v, r):
return 'z' + str(v)
db.representers = {
'rows_render': represent,
}
db.tt.insert(aa="foo", bb=2)
rows = db(db.tt.id>0).select()
exp1 = [Row(aa='ztest', bb='z1', id=rows[0]['id']),
Row(aa='zfoo', bb='z2', id=rows[1]['id'])]
exp2 = [Row(aa='ztest', bb=1, id=rows[0]['id']),
Row(aa='zfoo', bb=2, id=rows[1]['id'])]
exp3 = [Row(aa='test', bb='z1', id=rows[0]['id']),
Row(aa='foo', bb='z2', id=rows[1]['id'])]
self.assertEqual(rows.render(i=0), exp1[0])
self.assertEqual(rows.render(i=0, fields=[db.tt.aa, db.tt.bb]),
exp1[0])
self.assertEqual(rows.render(i=0, fields=[db.tt.aa]), exp2[0])
self.assertEqual(rows.render(i=0, fields=[db.tt.bb]), exp3[0])
self.assertEqual(list(rows.render()), exp1)
self.assertEqual(list(rows.render(fields=[db.tt.aa, db.tt.bb])), exp1)
self.assertEqual(list(rows.render(fields=[db.tt.aa])), exp2)
self.assertEqual(list(rows.render(fields=[db.tt.bb])), exp3)
ret = rows.render(i=0)
rows = db(db.tt.id>0).select()
rows.compact=False
row = rows[0]
self.assertIn('tt', row)
self.assertIn('id', row.tt)
self.assertNotIn('id', row)
rows.compact=True
row = rows[0]
self.assertNotIn('tt', row)
self.assertIn('id', row)
rows = db(db.tt.id>0).select(db.tt.id.max())
rows.compact=False
row = rows[0]
self.assertNotIn('tt', row)
self.assertIn('_extra', row)
rows = db(db.tt.id>0).select(db.tt.id.max())
rows.compact=True
row = rows[0]
self.assertNotIn('tt', row)
self.assertIn('_extra', row)
db.tt.drop()
db.define_table('tt', Field('aa'), Field.Virtual('bb', lambda row: ':p'))
db.tt.insert(aa="test")
rows = db(db.tt.id>0).select()
row = rows.first()
self.assertNotIn('tt', row)
self.assertIn('id', row)
self.assertIn('bb', row)
rows.compact = False
row = rows.first()
self.assertIn('tt', row)
self.assertEqual(len(row.keys()), 1)
self.assertIn('id', row.tt)
self.assertIn('bb', row.tt)
self.assertNotIn('id', row)
self.assertNotIn('bb', row)
class TestVirtualFields(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt', Field('aa'))
db.commit()
db.tt.insert(aa="test")
class Compute:
def a_upper(row): return row.tt.aa.upper()
db.tt.virtualfields.append(Compute())
assert db(db.tt.id>0).select().first().a_upper == 'TEST'
class TestComputedFields(DALtest):
def testRun(self):
db = self.connect()
db.define_table('tt',
Field('aa'),
Field('bb',default='x'),
Field('cc',compute=lambda r: r.aa+r.bb))
db.commit()
id = db.tt.insert(aa="z")
self.assertEqual(db.tt[id].cc,'zx')
db.tt.drop()
db.commit()
# test checking that a compute field can refer to earlier-defined computed fields
db.define_table('tt',
Field('aa'),
Field('bb',default='x'),
Field('cc',compute=lambda r: r.aa+r.bb),
Field('dd',compute=lambda r: r.bb + r.cc))
db.commit()
id = db.tt.insert(aa="z")
self.assertEqual(db.tt[id].dd,'xzx')
class TestCommonFilters(DALtest):
def testRun(self):
db = self.connect()
db.define_table('t1', Field('aa', 'integer'))
db.define_table('t2', Field('aa', 'integer'), Field('b', db.t1))
i1 = db.t1.insert(aa=1)
i2 = db.t1.insert(aa=2)
i3 = db.t1.insert(aa=3)
db.t2.insert(aa=4, b=i1)
db.t2.insert(aa=5, b=i2)
db.t2.insert(aa=6, b=i2)
db.t1._common_filter = lambda q: db.t1.aa>1
self.assertEqual(db(db.t1).count(),2)
self.assertEqual(db(db.t1).count(),2)
q = db.t2.b==db.t1.id
self.assertEqual(db(q).count(),2)
self.assertEqual(db(q).count(),2)
self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))),3)
db.t2._common_filter = lambda q: db.t2.aa<6
self.assertEqual(db(q).count(),1)
self.assertEqual(db(q).count(),1)
self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))),2)
# test delete
self.assertEqual(db(db.t2).count(),2)
db(db.t2).delete()
self.assertEqual(db(db.t2).count(),0)
db.t2._common_filter = None
self.assertEqual(db(db.t2).count(),1)
# test update
db.t2.insert(aa=4, b=i1)
db.t2.insert(aa=5, b=i2)
db.t2._common_filter = lambda q: db.t2.aa<6
self.assertEqual(db(db.t2).count(),2)
db(db.t2).update(aa=6)
self.assertEqual(db(db.t2).count(),0)
db.t2._common_filter = None
self.assertEqual(db(db.t2).count(),3)
class TestImportExportFields(DALtest):
def testRun(self):
db = self.connect()
db.define_table('person', Field('name'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO()
db.export_to_csv_file(stream)
db(db.pet).delete()
db(db.person).delete()
stream = StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10
class TestImportExportUuidFields(DALtest):
def testRun(self):
db = self.connect()
db.define_table('person', Field('name'),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k),uuid=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = StringIO()
db.export_to_csv_file(stream)
stream = StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person).count()==10
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20
class TestDALDictImportExport(unittest.TestCase):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name', default="Michael"),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
dbdict = db.as_dict(flat=True, sanitize=False)
assert isinstance(dbdict, dict)
uri = dbdict["uri"]
assert isinstance(uri, basestring) and uri
assert len(dbdict["tables"]) == 2
assert len(dbdict["tables"][0]["fields"]) == 3
assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
db2 = DAL(**dbdict)
assert len(db.tables) == len(db2.tables)
assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
db.pet.drop()
db.commit()
db2.commit()
dbjson = db.as_json(sanitize=False)
assert isinstance(dbjson, basestring) and len(dbjson) > 0
db3 = DAL(**json.loads(dbjson))
assert hasattr(db3, "person") and hasattr(db3.person, "uuid")
assert db3.person.uuid.type == db.person.uuid.type
db3.person.drop()
db3.commit()
db3.close()
mpfc = "Monty Python's Flying Circus"
dbdict4 = {"uri": DEFAULT_URI,
"tables":[{"tablename": "tvshow",
"fields": [{"fieldname": "name",
"default":mpfc},
{"fieldname": "rating",
"type":"double"}]},
{"tablename": "staff",
"fields": [{"fieldname": "name",
"default":"Michael"},
{"fieldname": "food",
"default":"Spam"},
{"fieldname": "tvshow",
"type": "reference tvshow"}]}]}
db4 = DAL(**dbdict4)
assert "staff" in db4.tables
assert "name" in db4.staff
assert db4.tvshow.rating.type == "double"
assert (db4.tvshow.insert(), db4.tvshow.insert(name="Loriot"),
db4.tvshow.insert(name="Il Mattatore")) == (1, 2, 3)
assert db4(db4.tvshow).select().first().id == 1
assert db4(db4.tvshow).select().first().name == mpfc
db4.staff.drop()
db4.tvshow.drop()
db4.commit()
dbdict5 = {"uri": DEFAULT_URI}
db5 = DAL(**dbdict5)
assert db5.tables in ([], None)
assert not (str(db5) in ("", None))
dbdict6 = {"uri": DEFAULT_URI,
"tables":[{"tablename": "staff"},
{"tablename": "tvshow",
"fields": [{"fieldname": "name"},
{"fieldname": "rating", "type":"double"}
]
}]
}
db6 = DAL(**dbdict6)
assert len(db6["staff"].fields) == 1
assert "name" in db6["tvshow"].fields
assert db6.staff.insert() is not None
assert db6(db6.staff).select().first().id == 1
db6.staff.drop()
db6.tvshow.drop()
db6.commit()
db.close()
db2.close()
db4.close()
db5.close()
db6.close()
class TestSelectAsDict(DALtest):
def testSelect(self):
db = self.connect()
db.define_table(
'a_table',
Field('b_field'),
Field('a_field'),
)
db.a_table.insert(a_field="aa1", b_field="bb1")
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
self.assertEqual(list(rtn[0].keys()), ['id', 'b_field', 'a_field'])
class TestExecuteSQL(DALtest):
def testSelect(self):
db = self.connect(DEFAULT_URI, entity_quoting=False)
db.define_table(
'a_table',
Field('b_field'),
Field('a_field'),
)
db.a_table.insert(a_field="aa1", b_field="bb1")
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
self.assertEqual(rtn[0]['b_field'], 'bb1')
self.assertEqual(list(rtn[0].keys()), ['id', 'b_field', 'a_field'])
rtn = db.executesql("select id, b_field, a_field from a_table", fields=db.a_table)
self.assertTrue(all(x in rtn[0].keys() for x in ['id', 'b_field', 'a_field']))
self.assertEqual(rtn[0].b_field, 'bb1')
rtn = db.executesql("select id, b_field, a_field from a_table", fields=db.a_table,
colnames=['a_table.id', 'a_table.b_field', 'a_table.a_field'])
self.assertTrue(all(x in rtn[0].keys() for x in ['id', 'b_field', 'a_field']))
self.assertEqual(rtn[0].b_field, 'bb1')
rtn = db.executesql("select COUNT(*) from a_table", fields=[db.a_table.id.count()], colnames=['foo'])
self.assertEqual(rtn[0].foo, 1)
class TestRNameTable(DALtest):
#tests for highly experimental rname attribute
def testSelect(self):
db = self.connect()
rname = 'a_very_complicated_tablename'
db.define_table(
'easy_name',
Field('a_field'),
rname=rname
)
rtn = db.easy_name.insert(a_field='a')
self.assertEqual(rtn.id, 1)
rtn = db(db.easy_name.a_field == 'a').select()
self.assertEqual(len(rtn), 1)
self.assertEqual(rtn[0].id, 1)
self.assertEqual(rtn[0].a_field, 'a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).delete()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 0)
db.easy_name.insert(a_field='a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
rtn = db(db.easy_name.a_field == 'c').count()
self.assertEqual(rtn, 1)
rtn = db(db.easy_name.a_field != 'c').count()
self.assertEqual(rtn, 1)
avg = db.easy_name.id.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 3)
rname = 'this_is_the_person_table'
db.define_table(
'person',
Field('name', default="Michael"),
Field('uuid'),
rname=rname
)
rname = 'this_is_the_pet_table'
db.define_table(
'pet',
Field('friend','reference person'),
Field('name'),
rname=rname
)
michael = db.person.insert() #default insert
john = db.person.insert(name='John')
luke = db.person.insert(name='Luke')
#michael owns Phippo
phippo = db.pet.insert(friend=michael, name="Phippo")
#john owns Dunstin and Gertie
dunstin = db.pet.insert(friend=john, name="Dunstin")
gertie = db.pet.insert(friend=john, name="Gertie")
rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#fetch owners, eventually with pet
#main point is retrieving Luke with no pets
rtn = db(db.person.id > 0).select(
orderby=db.person.id|db.pet.id,
left=db.pet.on(db.person.id == db.pet.friend)
)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[3].person.name, 'Luke')
self.assertEqual(rtn[3].person.id, luke)
self.assertEqual(rtn[3].pet.name, None)
#lets test a subquery
subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
rtn = db(db.person.id.belongs(subq)).select()
self.assertEqual(rtn[0].id, 2)
self.assertEqual(rtn[0]('person.name'), 'John')
#as dict
rtn = db(db.person.id > 0).select().as_dict()
self.assertEqual(rtn[1]['name'], 'Michael')
#as list
rtn = db(db.person.id > 0).select().as_list()
self.assertEqual(rtn[0]['name'], 'Michael')
#isempty
rtn = db(db.person.id > 0).isempty()
self.assertEqual(rtn, False)
#join argument
rtn = db(db.person).select(orderby=db.person.id|db.pet.id,
join=db.pet.on(db.person.id==db.pet.friend))
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#aliases
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
rname = 'the_cubs'
db.define_table('pet_farm',
Field('name'),
Field('father','reference pet_farm'),
Field('mother','reference pet_farm'),
rname=rname
)
minali = db.pet_farm.insert(name='Minali')
osbert = db.pet_farm.insert(name='Osbert')
#they had a cub
selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali)
father = db.pet_farm.with_alias('father')
mother = db.pet_farm.with_alias('mother')
#fetch pets with relatives
rtn = db().select(
db.pet_farm.name, father.name, mother.name,
left=[
father.on(father.id == db.pet_farm.father),
mother.on(mother.id == db.pet_farm.mother)
],
orderby=db.pet_farm.id
)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].pet_farm.name, 'Minali')
self.assertEqual(rtn[0].father.name, None)
self.assertEqual(rtn[0].mother.name, None)
self.assertEqual(rtn[1].pet_farm.name, 'Osbert')
self.assertEqual(rtn[2].pet_farm.name, 'Selina')
self.assertEqual(rtn[2].father.name, 'Osbert')
self.assertEqual(rtn[2].mother.name, 'Minali')
def testJoin(self):
db = self.connect()
rname = 'this_is_table_t1'
rname2 = 'this_is_table_t2'
db.define_table('t1', Field('aa'), rname=rname)
db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2)
i1 = db.t1.insert(aa='1')
i2 = db.t1.insert(aa='2')
i3 = db.t1.insert(aa='3')
db.t2.insert(aa='4', b=i1)
db.t2.insert(aa='5', b=i2)
db.t2.insert(aa='6', b=i2)
self.assertEqual(len(db(db.t1.id
== db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)), 3)
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t2.aa, '6')
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)), 4)
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa, groupby=db.t1.aa)),
3)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
1)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
2)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
0)
db.t2.drop()
db.t1.drop()
db.define_table('person',Field('name'), rname=rname)
id = db.person.insert(name="max")
self.assertEqual(id.name,'max')
db.define_table('dog',Field('name'),Field('ownerperson','reference person'), rname=rname2)
db.dog.insert(name='skipper',ownerperson=1)
row = db(db.person.id==db.dog.ownerperson).select().first()
self.assertEqual(row[db.person.name],'max')
self.assertEqual(row['person.name'],'max')
db.dog.drop()
self.assertEqual(len(db.person._referenced_by),0)
class TestRNameFields(DALtest):
# tests for highly experimental rname attribute
def testSelect(self):
db = self.connect()
rname = 'a_very_complicated_fieldname'
rname2 = 'rrating_from_1_to_10'
db.define_table(
'easy_name',
Field('a_field', rname=rname),
Field('rating', 'integer', rname=rname2, default=2)
)
rtn = db.easy_name.insert(a_field='a')
self.assertEqual(rtn.id, 1)
rtn = db(db.easy_name.a_field == 'a').select()
self.assertEqual(len(rtn), 1)
self.assertEqual(rtn[0].id, 1)
self.assertEqual(rtn[0].a_field, 'a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).delete()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 0)
db.easy_name.insert(a_field='a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
rtn = db(db.easy_name.a_field == 'c').count()
self.assertEqual(rtn, 1)
rtn = db(db.easy_name.a_field != 'c').count()
self.assertEqual(rtn, 1)
avg = db.easy_name.id.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 3)
avg = db.easy_name.rating.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 2)
rname = 'this_is_the_person_name'
db.define_table(
'person',
Field('id', type='id', rname='fooid'),
Field('name', default="Michael", rname=rname),
Field('uuid')
)
rname = 'this_is_the_pet_name'
db.define_table(
'pet',
Field('friend','reference person'),
Field('name', rname=rname)
)
michael = db.person.insert() #default insert
john = db.person.insert(name='John')
luke = db.person.insert(name='Luke')
#michael owns Phippo
phippo = db.pet.insert(friend=michael, name="Phippo")
#john owns Dunstin and Gertie
dunstin = db.pet.insert(friend=john, name="Dunstin")
gertie = db.pet.insert(friend=john, name="Gertie")
rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#fetch owners, eventually with pet
#main point is retrieving Luke with no pets
rtn = db(db.person.id > 0).select(
orderby=db.person.id|db.pet.id,
left=db.pet.on(db.person.id == db.pet.friend)
)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[3].person.name, 'Luke')
self.assertEqual(rtn[3].person.id, luke)
self.assertEqual(rtn[3].pet.name, None)
#lets test a subquery
subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
rtn = db(db.person.id.belongs(subq)).select()
self.assertEqual(rtn[0].id, 2)
self.assertEqual(rtn[0]('person.name'), 'John')
#as dict
rtn = db(db.person.id > 0).select().as_dict()
self.assertEqual(rtn[1]['name'], 'Michael')
#as list
rtn = db(db.person.id > 0).select().as_list()
self.assertEqual(rtn[0]['name'], 'Michael')
#isempty
rtn = db(db.person.id > 0).isempty()
self.assertEqual(rtn, False)
#join argument
rtn = db(db.person).select(orderby=db.person.id|db.pet.id,
join=db.pet.on(db.person.id==db.pet.friend))
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#aliases
rname = 'the_cub_name'
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
db.define_table('pet_farm',
Field('name', rname=rname),
Field('father','reference pet_farm'),
Field('mother','reference pet_farm'),
)
minali = db.pet_farm.insert(name='Minali')
osbert = db.pet_farm.insert(name='Osbert')
#they had a cub
selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali)
father = db.pet_farm.with_alias('father')
mother = db.pet_farm.with_alias('mother')
#fetch pets with relatives
rtn = db().select(
db.pet_farm.name, father.name, mother.name,
left=[
father.on(father.id == db.pet_farm.father),
mother.on(mother.id == db.pet_farm.mother)
],
orderby=db.pet_farm.id
)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].pet_farm.name, 'Minali')
self.assertEqual(rtn[0].father.name, None)
self.assertEqual(rtn[0].mother.name, None)
self.assertEqual(rtn[1].pet_farm.name, 'Osbert')
self.assertEqual(rtn[2].pet_farm.name, 'Selina')
self.assertEqual(rtn[2].father.name, 'Osbert')
self.assertEqual(rtn[2].mother.name, 'Minali')
def testRun(self):
db = self.connect()
rname = 'a_very_complicated_fieldname'
for ft in ['string', 'text', 'password', 'upload', 'blob']:
db.define_table('tt', Field('aa', ft, default='', rname=rname))
self.assertEqual(db.tt.insert(aa='x'), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 'x')
db.tt.drop()
db.define_table('tt', Field('aa', 'integer', default=1, rname=rname))
self.assertEqual(db.tt.insert(aa=3), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
db.tt.drop()
db.define_table('tt', Field('aa', 'double', default=1, rname=rname))
self.assertEqual(db.tt.insert(aa=3.1), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
db.tt.drop()
db.define_table('tt', Field('aa', 'boolean', default=True, rname=rname))
self.assertEqual(db.tt.insert(aa=True), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
db.tt.drop()
db.define_table('tt', Field('aa', 'json', default={}, rname=rname))
self.assertEqual(db.tt.insert(aa={}), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, {})
db.tt.drop()
db.define_table('tt', Field('aa', 'date',
default=datetime.date.today(), rname=rname))
t0 = datetime.date.today()
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'datetime',
default=datetime.datetime.today(), rname=rname))
t0 = datetime.datetime(
1971,
12,
21,
10,
30,
55,
0,
)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
## Row APIs
row = db().select(db.tt.aa)[0]
self.assertEqual(db.tt[1].aa,t0)
self.assertEqual(db.tt['aa'],db.tt.aa)
self.assertEqual(db.tt(1).aa,t0)
self.assertTrue(db.tt(1,aa=None)==None)
self.assertFalse(db.tt(1,aa=t0)==None)
self.assertEqual(row.aa,t0)
self.assertEqual(row['aa'],t0)
self.assertEqual(row['tt.aa'],t0)
self.assertEqual(row('tt.aa'),t0)
self.assertTrue('aa' in row)
self.assertTrue('pydal' not in row)
self.assertTrue(hasattr(row, 'aa'))
self.assertFalse(hasattr(row, 'pydal'))
## Lazy and Virtual fields
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
row = db().select(db.tt.aa)[0]
self.assertEqual(row.b,t0)
self.assertEqual(row.c(),t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'time', default='11:30', rname=rname))
t0 = datetime.time(10, 30, 55)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
def testInsert(self):
db = self.connect()
rname = 'a_very_complicated_fieldname'
db.define_table('tt', Field('aa', rname=rname))
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db.tt.insert(aa='1'), 2)
self.assertEqual(db.tt.insert(aa='1'), 3)
self.assertEqual(db(db.tt.aa == '1').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
self.assertEqual(db(db.tt.aa == '2').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
def testJoin(self):
db = self.connect()
rname = 'this_is_field_aa'
rname2 = 'this_is_field_b'
db.define_table('t1', Field('aa', rname=rname))
db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2))
i1 = db.t1.insert(aa='1')
i2 = db.t1.insert(aa='2')
i3 = db.t1.insert(aa='3')
db.t2.insert(aa='4', b=i1)
db.t2.insert(aa='5', b=i2)
db.t2.insert(aa='6', b=i2)
self.assertEqual(len(db(db.t1.id
== db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)), 3)
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t2.aa, '6')
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)), 4)
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa, groupby=db.t1.aa)),
3)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
1)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
2)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
0)
db.t2.drop()
db.t1.drop()
db.define_table('person',Field('name', rname=rname))
id = db.person.insert(name="max")
self.assertEqual(id.name,'max')
db.define_table('dog',Field('name', rname=rname),Field('ownerperson','reference person', rname=rname2))
db.dog.insert(name='skipper',ownerperson=1)
row = db(db.person.id==db.dog.ownerperson).select().first()
self.assertEqual(row[db.person.name],'max')
self.assertEqual(row['person.name'],'max')
db.dog.drop()
self.assertEqual(len(db.person._referenced_by),0)
def testTFK(self):
db = self.connect()
if 'reference TFK' not in db._adapter.types:
self.skipTest('Adapter does not support TFK references')
db.define_table('t1',
Field('id1', type='string', length=1, rname='foo1'),
Field('id2', type='integer', rname='foo2'),
Field('val', type='integer'),
primarykey=['id1', 'id2'])
db.define_table('t2',
Field('ref1', type=db.t1.id1, rname='bar1'),
Field('ref2', type=db.t1.id2, rname='bar2'))
db.t1.insert(id1='a', id2=1, val=10)
db.t1.insert(id1='a', id2=2, val=30)
db.t2.insert(ref1='a', ref2=1)
query = (db.t1.id1 == db.t2.ref1) & (db.t1.id2 == db.t2.ref2)
result = db(query).select(db.t1.ALL)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['id1'], 'a')
self.assertEqual(result[0]['id2'], 1)
self.assertEqual(result[0]['val'], 10)
class TestQuoting(DALtest):
# tests for case sensitivity
def testCase(self):
db = self.connect(ignore_field_case=False, entity_quoting=True)
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
t0 = db.define_table('t0',
Field('f', 'string'))
t1 = db.define_table('b',
Field('B', t0),
Field('words', 'text'))
blather = 'blah blah and so'
t0[0] = {'f': 'content'}
t1[0] = {'B': int(t0[1]['id']),
'words': blather}
r = db(db.t0.id==db.b.B).select()
self.assertEqual(r[0].b.words, blather)
t1.drop()
t0.drop()
# test field case
try:
t0 = db.define_table('table_is_a_test',
Field('a_a'),
Field('a_A'))
except Exception as e:
# some db does not support case sensitive field names mysql is one of them.
if DEFAULT_URI.startswith('mysql:') or DEFAULT_URI.startswith('sqlite:'):
db.rollback()
return
if 'Column names in each table must be unique' in e.args[1]:
db.rollback()
return
raise e
t0[0] = dict(a_a = 'a_a', a_A='a_A')
self.assertEqual(t0[1].a_a, 'a_a')
self.assertEqual(t0[1].a_A, 'a_A')
def testPKFK(self):
# test primary keys
db = self.connect(ignore_field_case=False)
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
# test table without surrogate key. Length must is limited to
# 100 because of MySQL limitations: it cannot handle more than
# 767 bytes in unique keys.
t0 = db.define_table('t0', Field('Code', length=100), primarykey=['Code'])
t2 = db.define_table('t2', Field('f'), Field('t0_Code', 'reference t0'))
t3 = db.define_table('t3', Field('f', length=100), Field('t0_Code', t0.Code), primarykey=['f'])
t4 = db.define_table('t4', Field('f', length=100), Field('t0', t0), primarykey=['f'])
try:
t5 = db.define_table('t5', Field('f', length=100), Field('t0', 'reference no_table_wrong_reference'), primarykey=['f'])
except Exception as e:
self.assertTrue(isinstance(e, KeyError))
if DEFAULT_URI.startswith('mssql'):
#there's no drop cascade in mssql
t3.drop()
t4.drop()
t2.drop()
t0.drop()
else:
t0.drop('cascade')
t2.drop()
t3.drop()
t4.drop()
class TestTableAndFieldCase(unittest.TestCase):
"""
at the Python level we should not allow db.C and db.c because of .table conflicts on windows
but it should be possible to map two different names into distinct tables "c" and "C" at the Python level
By default Python models names should be mapped into lower case table names and assume case insensitivity.
"""
def testme(self):
return
class TestQuotesByDefault(unittest.TestCase):
"""
all default tables names should be quoted unless an explicit mapping has been given for a table.
"""
def testme(self):
return
class TestGis(DALtest):
def testGeometry(self):
from pydal import geoPoint, geoLine, geoPolygon
if not IS_POSTGRESQL: return
db = self.connect()
t0 = db.define_table('t0', Field('point', 'geometry()'))
t1 = db.define_table('t1', Field('line', 'geometry(public, 4326, 2)'))
t2 = db.define_table('t2', Field('polygon', 'geometry(public, 4326, 2)'))
t0.insert(point=geoPoint(1,1))
text = db(db.t0.id).select(db.t0.point.st_astext()).first()[db.t0.point.st_astext()]
self.assertEqual(text, "POINT(1 1)")
t1.insert(line=geoLine((1,1),(2,2)))
text = db(db.t1.id).select(db.t1.line.st_astext()).first()[db.t1.line.st_astext()]
self.assertEqual(text, "LINESTRING(1 1,2 2)")
t2.insert(polygon=geoPolygon((0,0),(2,0),(2,2),(0,2),(0,0)))
text = db(db.t2.id).select(db.t2.polygon.st_astext()).first()[db.t2.polygon.st_astext()]
self.assertEqual(text, "POLYGON((0 0,2 0,2 2,0 2,0 0))")
query = t0.point.st_intersects(geoLine((0,0),(2,2)))
output = db(query).select(db.t0.point).first()[db.t0.point]
self.assertEqual(output, "POINT(1 1)")
query = t2.polygon.st_contains(geoPoint(1,1))
n = db(query).count()
self.assertEqual(n, 1)
x=t0.point.st_x()
y=t0.point.st_y()
point = db(t0.id).select(x, y).first()
self.assertEqual(point[x], 1)
self.assertEqual(point[y], 1)
def testGeometryCase(self):
from pydal import geoPoint, geoLine, geoPolygon
if not IS_POSTGRESQL: return
db = self.connect(ignore_field_case=False)
t0 = db.define_table('t0', Field('point', 'geometry()'), Field('Point', 'geometry()'))
t0.insert(point=geoPoint(1,1))
t0.insert(Point=geoPoint(2,2))
def testGisMigration(self):
if not IS_POSTGRESQL: return
for b in [True, False]:
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=b)
t0 = db.define_table('t0', Field('Point', 'geometry()'),
Field('rname_point', 'geometry()', rname='foo'))
db.commit()
db.close()
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=b)
t0 = db.define_table('t0', Field('New_point', 'geometry()'))
t0.drop()
db.commit()
db.close()
class TestSQLCustomType(DALtest):
def testRun(self):
db = self.connect()
from pydal.helpers.classes import SQLCustomType
native_double = "double"
native_string = "string"
if hasattr(db._adapter, 'types'):
native_double = db._adapter.types['double']
native_string = db._adapter.types['string'] % {'length': 256}
basic_t = SQLCustomType(type = "double", native = native_double)
basic_t_str = SQLCustomType(type = "string", native = native_string)
t0=db.define_table('t0', Field("price", basic_t), Field("product", basic_t_str))
r_id = t0.insert(price=None, product=None)
row = db(t0.id == r_id).select(t0.ALL).first()
self.assertEqual(row['price'], None)
self.assertEqual(row['product'], None)
r_id = t0.insert(price=1.2, product="car")
row=db(t0.id == r_id).select(t0.ALL).first()
self.assertEqual(row['price'], 1.2)
self.assertEqual(row['product'], 'car')
t0.drop()
import zlib
compressed = SQLCustomType(
type ='text',
native='text',
encoder =(lambda x: zlib.compress(x or '', 1)),
decoder = (lambda x: zlib.decompress(x))
)
t1=db.define_table('t0',Field('cdata', compressed))
#r_id=t1.insert(cdata="car")
#row=db(t1.id == r_id).select(t1.ALL).first()
#self.assertEqual(row['cdata'], "'car'")
class TestLazy(DALtest):
def testRun(self):
db = self.connect(lazy_tables=True)
t0 = db.define_table('t0', Field('name'))
self.assertTrue(('t0' in db._LAZY_TABLES.keys()))
db.t0.insert(name='1')
self.assertFalse(('t0' in db._LAZY_TABLES.keys()))
def testLazyGetter(self):
db = self.connect(check_reserved=None, lazy_tables=True)
db.define_table('tt', Field('value', 'integer'))
db.define_table('ttt',
Field('value', 'integer'),
Field('tt_id', 'reference tt'),
)
# Force table definition
db.ttt.value.writable=False
idd=db.tt.insert(value=0)
db.ttt.insert(tt_id=idd)
def testRowNone(self):
db = self.connect(check_reserved=None, lazy_tables=True)
tt = db.define_table('tt', Field('value', 'integer'))
db.tt.insert(value=None)
row = db(db.tt).select(db.tt.ALL).first()
self.assertEqual(row.value, None)
self.assertEqual(row[db.tt.value], None)
self.assertEqual(row['tt.value'], None)
self.assertEqual(row.get('tt.value'), None)
self.assertEqual(row['value'], None)
self.assertEqual(row.get('value'), None)
def testRowExtra(self):
db = self.connect(check_reserved=None, lazy_tables=True)
tt = db.define_table('tt', Field('value', 'integer'))
db.tt.insert(value=1)
row = db(db.tt).select('value').first()
self.assertEqual(row.value, 1)
class TestRedefine(unittest.TestCase):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'], lazy_tables=True, migrate=False)
db.define_table('t_a', Field('code'))
self.assertTrue('code' in db.t_a)
self.assertTrue('code' in db['t_a'])
db.define_table('t_a', Field('code_a'), redefine=True)
self.assertFalse('code' in db.t_a)
self.assertFalse('code' in db['t_a'])
self.assertTrue('code_a' in db.t_a)
self.assertTrue('code_a' in db['t_a'])
db.close()
class TestUpdateInsert(DALtest):
def testRun(self):
db = self.connect()
t0 = db.define_table('t0', Field('name'))
i_id = t0.update_or_insert((t0.id == 1), name='web2py')
u_id = t0.update_or_insert((t0.id == i_id), name='web2py2')
self.assertTrue(i_id != None)
self.assertTrue(u_id == None)
self.assertTrue(db(t0).count() == 1)
self.assertTrue(db(t0.name == 'web2py').count() == 0)
self.assertTrue(db(t0.name == 'web2py2').count() == 1)
class TestBulkInsert(DALtest):
def testRun(self):
db = self.connect()
t0 = db.define_table('t0', Field('name'))
global ctr
ctr = 0
def test_after_insert(i, r):
self.assertIsInstance(i, OpRow)
global ctr
ctr += 1
return True
t0._after_insert.append(test_after_insert)
items = [{'name':'web2py_%s' % pos} for pos in range(0, 10, 1)]
t0.bulk_insert(items)
self.assertTrue(db(t0).count() == len(items))
for pos in range(0, 10, 1):
self.assertTrue(db(t0.name == 'web2py_%s' % pos).count() == 1)
self.assertTrue(ctr == len(items))
class TestRecordVersioning(DALtest):
def testRun(self):
db = self.connect()
db.define_table('t0', Field('name'), Field('is_active', writable=False,readable=False,default=True))
db.t0._enable_record_versioning(archive_name='t0_archive')
self.assertTrue('t0_archive' in db)
i_id = db.t0.insert(name='web2py1')
db.t0.insert(name='web2py2')
db(db.t0.name == 'web2py2').delete()
self.assertEqual(len(db(db.t0).select()), 1)
self.assertEqual(db(db.t0).count(), 1)
db(db.t0.id == i_id).update(name='web2py3')
self.assertEqual(len(db(db.t0).select()), 1)
self.assertEqual(db(db.t0).count(), 1)
self.assertEqual(len(db(db.t0_archive).select()), 2)
self.assertEqual(db(db.t0_archive).count(), 2)
@unittest.skipIf(IS_SQLITE, "Skip sqlite")
class TestConnection(unittest.TestCase):
def testRun(self):
# check connection is no longer active after close
db = DAL(DEFAULT_URI, check_reserved=['all'])
connection = db._adapter.connection
db.close()
self.assertRaises(Exception, connection.commit)
# check connection are reused with pool_size
connections = set()
for a in range(10):
db2 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
c = db2._adapter.connection
connections.add(c)
db2.close()
self.assertEqual(len(connections), 1)
c = connections.pop()
c.commit()
c.close()
# check correct use of pool_size
dbs = []
for a in range(10):
db3 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
dbs.append(db3)
for db in dbs:
db.close()
self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5)
for c in db3._adapter.POOLS[DEFAULT_URI]:
c.close()
db3._adapter.POOLS[DEFAULT_URI] = []
# Clean close if a connection is broken (closed explicity)
for a in range(10):
db4 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
db4._adapter.connection.close()
db4.close()
self.assertEqual(len(db4._adapter.POOLS[DEFAULT_URI]), 0)
class TestSerializers(DALtest):
def testAsJson(self):
db = self.connect()
db.define_table('tt', Field('date_field', 'datetime'))
db.tt.insert(date_field=datetime.datetime.now())
rows = db().select(db.tt.ALL)
j=rows.as_json()
import json #standard library
json.loads(j)
def testSelectIterselect(self):
db = self.connect()
db.define_table('tt', Field('tt'))
db.tt.insert(tt='pydal')
methods = ['as_dict', 'as_csv', 'as_json', 'as_xml', 'as_list']
for method in methods:
rows = db(db.tt).select()
rowsI = db(db.tt).iterselect()
self.assertEqual(getattr(rows, method)(),
getattr(rowsI, method)(),
'failed %s' % method)
class TestIterselect(DALtest):
def testRun(self):
db = self.connect()
t0 = db.define_table('t0', Field('name'))
names = ['web2py', 'pydal', 'Massimo']
for n in names:
t0.insert(name=n)
rows = db(db.t0).select(orderby=db.t0.id)
for pos, r in enumerate(rows):
self.assertEqual(r.name, names[pos])
# Testing basic iteration
rows = db(db.t0).iterselect(orderby=db.t0.id)
for pos, r in enumerate(rows):
self.assertEqual(r.name, names[pos])
# Testing IterRows.first before basic iteration
rows = db(db.t0).iterselect(orderby=db.t0.id)
self.assertEqual(rows.first().name, names[0])
self.assertEqual(rows.first().name, names[0])
for pos, r in enumerate(rows):
self.assertEqual(r.name, names[pos])
# Testing IterRows.__nonzero__ before basic iteration
rows = db(db.t0).iterselect(orderby=db.t0.id)
if rows:
for pos, r in enumerate(rows):
self.assertEqual(r.name, names[pos])
# Empty iterRows
rows = db(db.t0.name=="IterRows").iterselect(orderby=db.t0.id)
self.assertEqual(bool(rows), False)
for pos, r in enumerate(rows):
self.assertEqual(r.name, names[pos])
# Testing IterRows.__getitem__
rows = db(db.t0).iterselect(orderby=db.t0.id)
self.assertEqual(rows[0].name, names[0])
self.assertEqual(rows[1].name, names[1])
# recall the same item
self.assertEqual(rows[1].name, names[1])
self.assertEqual(rows[2].name, names[2])
self.assertRaises(IndexError, rows.__getitem__, 1)
# Testing IterRows.next()
rows = db(db.t0).iterselect(orderby=db.t0.id)
for n in names:
self.assertEqual(next(rows).name, n)
self.assertRaises(StopIteration, next, rows)
# Testing IterRows.compact
rows = db(db.t0).iterselect(orderby=db.t0.id)
rows.compact = False
for n in names:
self.assertEqual(next(rows).t0.name, n)
@unittest.skipIf(IS_MSSQL, "Skip mssql")
def testMultiSelect(self):
# Iterselect holds the cursors until all elemets have been evaluated
# inner queries use new cursors
db = self.connect()
t0 = db.define_table('t0', Field('name'), Field('name_copy'))
db(db.t0).delete()
db.commit()
names = ['web2py', 'pydal', 'Massimo']
for n in names:
t0.insert(name=n)
c = 0
for r in db(db.t0).iterselect():
db.t0.update_or_insert(db.t0.id == r.id, name_copy = r.name)
c += 1
self.assertEqual(c, len(names), "The iterator is not looping over all elements")
self.assertEqual(db(db.t0).count(), len(names))
c = 0
for x in db(db.t0).iterselect(orderby=db.t0.id):
for y in db(db.t0).iterselect(orderby=db.t0.id):
db.t0.update_or_insert(db.t0.id == x.id, name_copy = x.name)
c += 1
self.assertEqual(c, len(names)*len(names))
self.assertEqual(db(db.t0).count(), len(names))
db._adapter.test_connection()
@unittest.skipIf(IS_SQLITE | IS_MSSQL, "Skip sqlite & ms sql")
def testMultiSelectWithCommit(self):
db = self.connect()
t0 = db.define_table('t0', Field('nn', 'integer'))
for n in xrange(1, 100, 1):
t0.insert(nn=n)
db.commit()
s = db.t0.nn.sum()
tot = db(db.t0).select(s).first()[s]
c = 0
for r in db(db.t0).iterselect(db.t0.ALL):
db.t0.update_or_insert(db.t0.id == r.id, nn = r.nn * 2)
db.commit()
c += 1
self.assertEqual(c, db(db.t0).count())
self.assertEqual(tot * 2, db(db.t0).select(s).first()[s])
db._adapter.test_connection()
if __name__ == '__main__':
unittest.main()
tearDownModule()