3014 lines
120 KiB
Python
3014 lines
120 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Basic unit tests
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
import os
|
|
import glob
|
|
import datetime
|
|
import json
|
|
import pickle
|
|
|
|
from pydal._compat import basestring, StringIO, integer_types, xrange, BytesIO, to_bytes
|
|
from pydal import DAL, Field
|
|
from pydal.helpers.classes import SQLALL, OpRow
|
|
from pydal.objects import Table, Expression, Row
|
|
from ._compat import unittest
|
|
from ._adapt import (
|
|
DEFAULT_URI, IS_POSTGRESQL, IS_SQLITE, IS_MSSQL, IS_MYSQL, IS_TERADATA)
|
|
from ._helpers import DALtest
|
|
|
|
long = integer_types[-1]
|
|
|
|
print('Testing against %s engine (%s)' % (DEFAULT_URI.partition(':')[0],
|
|
DEFAULT_URI))
|
|
|
|
|
|
ALLOWED_DATATYPES = [
|
|
'string',
|
|
'text',
|
|
'integer',
|
|
'boolean',
|
|
'double',
|
|
'blob',
|
|
'date',
|
|
'time',
|
|
'datetime',
|
|
'upload',
|
|
'password',
|
|
'json',
|
|
'bigint'
|
|
]
|
|
|
|
|
|
|
|
def setUpModule():
|
|
if IS_MYSQL or IS_TERADATA:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
|
|
def clean_table(db, tablename):
|
|
try:
|
|
db.define_table(tablename)
|
|
except Exception as e:
|
|
pass
|
|
try:
|
|
db[tablename].drop()
|
|
except Exception as e:
|
|
pass
|
|
|
|
for tablename in ['tt', 't0', 't1', 't2', 't3', 't4',
|
|
'easy_name', 'tt_archive', 'pet_farm', 'person']:
|
|
clean_table(db, tablename)
|
|
db.close()
|
|
|
|
|
|
def tearDownModule():
|
|
if os.path.isfile('sql.log'):
|
|
os.unlink('sql.log')
|
|
for a in glob.glob('*.table'):
|
|
os.unlink(a)
|
|
|
|
|
|
class TestFields(DALtest):
|
|
|
|
def testFieldName(self):
|
|
"""
|
|
- a "str" something
|
|
- not a method or property of Table
|
|
- "dotted-notation" friendly:
|
|
- a valid python identifier
|
|
- not a python keyword
|
|
- not starting with underscore or an integer
|
|
- not containing dots
|
|
|
|
Basically, anything alphanumeric, no symbols, only underscore as
|
|
punctuation
|
|
"""
|
|
|
|
# Check that Fields cannot start with underscores
|
|
self.assertRaises(SyntaxError, Field, '_abc', 'string')
|
|
|
|
# Check that Fields cannot contain punctuation other than underscores
|
|
self.assertRaises(SyntaxError, Field, 'a.bc', 'string')
|
|
|
|
# Check that Fields cannot be a name of a method or property of Table
|
|
for x in ['drop', 'on', 'truncate']:
|
|
self.assertRaises(SyntaxError, Field, x, 'string')
|
|
|
|
# Check that Fields allows underscores in the body of a field name.
|
|
self.assertTrue(Field('a_bc', 'string'),
|
|
"Field isn't allowing underscores in fieldnames. It should.")
|
|
|
|
# Check that Field names don't allow a python keyword
|
|
self.assertRaises(SyntaxError, Field, 'True', 'string')
|
|
self.assertRaises(SyntaxError, Field, 'elif', 'string')
|
|
self.assertRaises(SyntaxError, Field, 'while', 'string')
|
|
|
|
# Check that Field names don't allow a non-valid python identifier
|
|
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Field, a, 'string')
|
|
|
|
# Check that Field names don't allow a unicode string
|
|
non_valid_examples = non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
|
|
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
|
|
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Field, a, 'string')
|
|
|
|
def testFieldTypes(self):
|
|
|
|
# Check that string, and password default length is 512
|
|
for typ in ['string', 'password']:
|
|
self.assertTrue(Field('abc', typ).length == 512,
|
|
"Default length for type '%s' is not 512 or 255" % typ)
|
|
|
|
# Check that upload default length is 512
|
|
self.assertTrue(Field('abc', 'upload').length == 512,
|
|
"Default length for type 'upload' is not 512")
|
|
|
|
# Check that Tables passed in the type creates a reference
|
|
self.assertTrue(Field('abc', Table(None, 'temp')).type
|
|
== 'reference temp',
|
|
'Passing a Table does not result in a reference type.')
|
|
|
|
def testFieldLabels(self):
|
|
|
|
# Check that a label is successfully built from the supplied fieldname
|
|
self.assertTrue(Field('abc', 'string').label == 'Abc',
|
|
'Label built is incorrect')
|
|
self.assertTrue(Field('abc_def', 'string').label == 'Abc Def',
|
|
'Label built is incorrect')
|
|
|
|
def testFieldFormatters(self): # Formatter should be called Validator
|
|
|
|
# Test the default formatters
|
|
for typ in ALLOWED_DATATYPES:
|
|
f = Field('abc', typ)
|
|
if typ not in ['date', 'time', 'datetime']:
|
|
isinstance(f.formatter('test'), str)
|
|
else:
|
|
isinstance(f.formatter(datetime.datetime.now()), str)
|
|
|
|
def testUploadField(self):
|
|
import tempfile
|
|
|
|
stream = tempfile.NamedTemporaryFile()
|
|
content = b"this is the stream content"
|
|
stream.write(content)
|
|
# rewind before inserting
|
|
stream.seek(0)
|
|
|
|
|
|
db = self.connect()
|
|
db.define_table('tt', Field('fileobj', 'upload',
|
|
uploadfolder=tempfile.gettempdir(),
|
|
autodelete=True))
|
|
f_id = db.tt.insert(fileobj=stream)
|
|
|
|
row = db.tt[f_id]
|
|
(retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
|
|
|
|
# name should be the same
|
|
self.assertEqual(retr_name, os.path.basename(stream.name))
|
|
# content should be the same
|
|
retr_content = retr_stream.read()
|
|
self.assertEqual(retr_content, content)
|
|
|
|
# close streams!
|
|
retr_stream.close()
|
|
|
|
# delete
|
|
row.delete_record()
|
|
|
|
# drop
|
|
db.tt.drop()
|
|
|
|
# this part is triggered only if fs (AKA pyfilesystem) module is installed
|
|
try:
|
|
from fs.memoryfs import MemoryFS
|
|
|
|
# rewind before inserting
|
|
stream.seek(0)
|
|
db.define_table('tt', Field('fileobj', 'upload',
|
|
uploadfs=MemoryFS(),
|
|
autodelete=True))
|
|
|
|
f_id = db.tt.insert(fileobj=stream)
|
|
|
|
row = db.tt[f_id]
|
|
(retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
|
|
|
|
# name should be the same
|
|
self.assertEqual(retr_name, os.path.basename(stream.name))
|
|
# content should be the same
|
|
retr_content = retr_stream.read()
|
|
self.assertEqual(retr_content, content)
|
|
|
|
# close streams
|
|
retr_stream.close()
|
|
stream.close()
|
|
|
|
# delete
|
|
row.delete_record()
|
|
|
|
# drop
|
|
db.tt.drop()
|
|
|
|
except ImportError:
|
|
pass
|
|
|
|
def testBlobBytes(self):
|
|
#Test blob with latin1 encoded bytes
|
|
db = self.connect()
|
|
obj = pickle.dumps('0')
|
|
db.define_table('tt', Field('aa', 'blob'))
|
|
self.assertEqual(db.tt.insert(aa=obj), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, obj)
|
|
self.assertEqual(db.tt[1].aa, obj)
|
|
self.assertEqual(BytesIO(to_bytes(db.tt[1].aa)).read(), obj)
|
|
db.tt.drop()
|
|
|
|
def testRun(self):
|
|
"""Test all field types and their return values"""
|
|
db = self.connect()
|
|
for ft in ['string', 'text', 'password', 'upload', 'blob']:
|
|
db.define_table('tt', Field('aa', ft, default=''))
|
|
self.assertEqual(db.tt.insert(aa='ö'), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 'ö')
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'integer', default=1))
|
|
self.assertEqual(db.tt.insert(aa=3), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'string'))
|
|
ucs = 'A\xc3\xa9 A'
|
|
self.assertEqual(db.tt.insert(aa=ucs), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, ucs)
|
|
self.assertEqual(db().select(db.tt.aa.with_alias('zz'))[0].zz, ucs)
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'double', default=1))
|
|
self.assertEqual(db.tt.insert(aa=3.1), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'boolean', default=True))
|
|
self.assertEqual(db.tt.insert(aa=True), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'json', default={}))
|
|
# test different python objects for correct serialization in json
|
|
objs = [
|
|
{'a' : 1, 'b' : 2},
|
|
[1, 2, 3],
|
|
'abc',
|
|
True,
|
|
False,
|
|
None,
|
|
11,
|
|
14.3,
|
|
long(11)
|
|
]
|
|
for obj in objs:
|
|
rtn_id = db.tt.insert(aa=obj)
|
|
rtn = db(db.tt.id == rtn_id).select().first().aa
|
|
self.assertEqual(obj, rtn)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'date',
|
|
default=datetime.date.today()))
|
|
t0 = datetime.date.today()
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'datetime',
|
|
default=datetime.datetime.today()))
|
|
t0 = datetime.datetime(
|
|
1971,
|
|
12,
|
|
21,
|
|
10,
|
|
30,
|
|
55,
|
|
0,
|
|
)
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
|
|
## Row APIs
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(db.tt[1].aa,t0)
|
|
self.assertEqual(db.tt['aa'],db.tt.aa)
|
|
self.assertEqual(db.tt(1).aa,t0)
|
|
self.assertTrue(db.tt(1,aa=None)==None)
|
|
self.assertFalse(db.tt(1,aa=t0)==None)
|
|
self.assertEqual(row.aa,t0)
|
|
self.assertEqual(row['aa'],t0)
|
|
self.assertEqual(row['tt.aa'],t0)
|
|
self.assertEqual(row('tt.aa'),t0)
|
|
|
|
## Lazy and Virtual fields
|
|
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
|
|
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(row.b,t0)
|
|
self.assertEqual(row.c(),t0)
|
|
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'time', default='11:30'))
|
|
t0 = datetime.time(10, 30, 55)
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
db.tt.drop()
|
|
|
|
# aggregation type detection
|
|
db.define_table('tt', Field('aa', 'datetime',
|
|
default=datetime.datetime.today()))
|
|
t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0)
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa.min())[0][db.tt.aa.min()], t0)
|
|
db.tt.drop()
|
|
|
|
|
|
class TestTables(unittest.TestCase):
|
|
|
|
def testTableNames(self):
|
|
"""
|
|
- a "str" something
|
|
- not a method or property of DAL
|
|
- "dotted-notation" friendly:
|
|
- a valid python identifier
|
|
- not a python keyword
|
|
- not starting with underscore or an integer
|
|
- not containing dots
|
|
|
|
Basically, anything alphanumeric, no symbols, only underscore as
|
|
punctuation
|
|
"""
|
|
|
|
# Check that Tables cannot start with underscores
|
|
self.assertRaises(SyntaxError, Table, None, '_abc')
|
|
|
|
# Check that Tables cannot contain punctuation other than underscores
|
|
self.assertRaises(SyntaxError, Table, None, 'a.bc')
|
|
|
|
# Check that Tables cannot be a name of a method or property of DAL
|
|
for x in ['define_table', 'tables', 'as_dict']:
|
|
self.assertRaises(SyntaxError, Table, None, x)
|
|
|
|
# Check that Table allows underscores in the body of a field name.
|
|
self.assertTrue(Table(None, 'a_bc'),
|
|
"Table isn't allowing underscores in tablename. It should.")
|
|
|
|
# Check that Table names don't allow a python keyword
|
|
self.assertRaises(SyntaxError, Table, None, 'True')
|
|
self.assertRaises(SyntaxError, Table, None, 'elif')
|
|
self.assertRaises(SyntaxError, Table, None, 'while')
|
|
|
|
# Check that Table names don't allow a non-valid python identifier
|
|
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Table, None, a)
|
|
|
|
# Check that Table names don't allow a unicode string
|
|
non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
|
|
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
|
|
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Table, None, a)
|
|
|
|
|
|
class TestAll(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.pt = Table(None,'PseudoTable',Field('name'),Field('birthdate'))
|
|
|
|
def testSQLALL(self):
|
|
ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate'
|
|
self.assertEqual(str(SQLALL(self.pt)), ans)
|
|
|
|
|
|
class TestTable(DALtest):
|
|
|
|
def testTableCreation(self):
|
|
|
|
# Check for error when not passing type other than Field or Table
|
|
|
|
self.assertRaises(SyntaxError, Table, None, 'test', None)
|
|
|
|
persons = Table(None, 'persons',
|
|
Field('firstname','string'),
|
|
Field('lastname', 'string'))
|
|
|
|
# Does it have the correct fields?
|
|
|
|
self.assertTrue(set(persons.fields).issuperset(set(['firstname',
|
|
'lastname'])))
|
|
|
|
# ALL is set correctly
|
|
|
|
self.assertTrue('persons.firstname, persons.lastname'
|
|
in str(persons.ALL))
|
|
|
|
def testTableAlias(self):
|
|
db = self.connect()
|
|
persons = Table(db, 'persons', Field('firstname',
|
|
'string'), Field('lastname', 'string'))
|
|
aliens = persons.with_alias('aliens')
|
|
|
|
# Are the different table instances with the same fields
|
|
|
|
self.assertTrue(persons is not aliens)
|
|
self.assertTrue(set(persons.fields) == set(aliens.fields))
|
|
|
|
def testTableInheritance(self):
|
|
persons = Table(None, 'persons', Field('firstname',
|
|
'string'), Field('lastname', 'string'))
|
|
customers = Table(None, 'customers',
|
|
Field('items_purchased', 'integer'),
|
|
persons)
|
|
self.assertTrue(set(customers.fields).issuperset(set(
|
|
['items_purchased', 'firstname', 'lastname'])))
|
|
|
|
|
|
class TestInsert(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa='1'), 2)
|
|
self.assertEqual(db.tt.insert(aa='1'), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
|
|
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
|
|
|
|
class TestSelect(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa='2'), 2)
|
|
self.assertEqual(db.tt.insert(aa='3'), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa='2'), 1)
|
|
self.assertEqual(db.tt.insert(aa='3'), 1)
|
|
self.assertEqual(db(db.tt.id > 0).count(), 3)
|
|
self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa
|
|
| db.tt.id)[0].aa, '3')
|
|
self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1)
|
|
self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, '2')
|
|
self.assertEqual(len(db().select(db.tt.ALL)), 3)
|
|
self.assertEqual(db(db.tt.aa == None).count(), 0)
|
|
self.assertEqual(db(db.tt.aa != None).count(), 3)
|
|
self.assertEqual(db(db.tt.aa > '1').count(), 2)
|
|
self.assertEqual(db(db.tt.aa >= '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 1)
|
|
self.assertEqual(db(db.tt.aa != '1').count(), 2)
|
|
self.assertEqual(db(db.tt.aa < '3').count(), 2)
|
|
self.assertEqual(db(db.tt.aa <= '3').count(), 3)
|
|
self.assertEqual(db(db.tt.aa > '1')(db.tt.aa < '3').count(), 1)
|
|
self.assertEqual(db((db.tt.aa > '1') & (db.tt.aa < '3')).count(), 1)
|
|
self.assertEqual(db((db.tt.aa > '1') | (db.tt.aa < '3')).count(), 3)
|
|
self.assertEqual(db((db.tt.aa > '1') & ~(db.tt.aa > '2')).count(), 1)
|
|
self.assertEqual(db(~(db.tt.aa > '1') & (db.tt.aa > '2')).count(), 0)
|
|
# Test for REGEX_TABLE_DOT_FIELD
|
|
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa], '1')
|
|
|
|
def testTestQuery(self):
|
|
db = self.connect()
|
|
db._adapter.test_connection()
|
|
|
|
def testListInteger(self):
|
|
db = self.connect()
|
|
db.define_table('tt',
|
|
Field('aa', 'list:integer'))
|
|
l=[1,2,3,4,5]
|
|
db.tt.insert(aa=l)
|
|
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
|
|
|
|
def testListString(self):
|
|
db = self.connect()
|
|
db.define_table('tt',
|
|
Field('aa', 'list:string'))
|
|
l=['a', 'b', 'c']
|
|
db.tt.insert(aa=l)
|
|
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
|
|
|
|
def testListReference(self):
|
|
db = self.connect()
|
|
db.define_table('t0', Field('aa', 'string'))
|
|
db.define_table('tt', Field('t0_id', 'list:reference t0'))
|
|
id_a1=db.t0.insert(aa='test1')
|
|
id_a2=db.t0.insert(aa='test2')
|
|
ref1=[id_a1]
|
|
ref2=[id_a2]
|
|
ref3=[id_a1, id_a2]
|
|
db.tt.insert(t0_id=ref1)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
|
|
db.tt.insert(t0_id=ref2)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
|
|
db.tt.insert(t0_id=ref3)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
|
|
|
|
self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
|
|
|
|
def testGroupByAndDistinct(self):
|
|
db = self.connect()
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb', 'integer'),
|
|
Field('cc', 'integer'))
|
|
db.tt.insert(aa='4', bb=1, cc=1)
|
|
db.tt.insert(aa='3', bb=2, cc=1)
|
|
db.tt.insert(aa='3', bb=1, cc=1)
|
|
db.tt.insert(aa='1', bb=1, cc=1)
|
|
db.tt.insert(aa='1', bb=2, cc=1)
|
|
db.tt.insert(aa='1', bb=3, cc=1)
|
|
db.tt.insert(aa='1', bb=4, cc=1)
|
|
db.tt.insert(aa='2', bb=1, cc=1)
|
|
db.tt.insert(aa='2', bb=2, cc=1)
|
|
db.tt.insert(aa='2', bb=3, cc=1)
|
|
self.assertEqual(db(db.tt).count(), 10)
|
|
|
|
# test groupby
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa)
|
|
self.assertEqual(len(result), 4)
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=db.tt.aa)
|
|
self.assertEqual(tuple(result.response[2]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=~db.tt.aa)
|
|
self.assertEqual(tuple(result.response[1]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb, db.tt.cc.sum(),
|
|
groupby=db.tt.aa|db.tt.bb,
|
|
orderby=(db.tt.aa|~db.tt.bb))
|
|
self.assertEqual(tuple(result.response[4]), ('2', 3, 1))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=~db.tt.aa, limitby=(1,2))
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(tuple(result.response[0]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, limitby=(0,3))
|
|
self.assertEqual(len(result), 3)
|
|
self.assertEqual(tuple(result.response[2]), ('3', 3))
|
|
|
|
# test having
|
|
self.assertEqual(len(db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, having=db.tt.bb.sum() > 2)), 3)
|
|
|
|
# test distinct
|
|
result = db().select(db.tt.aa, db.tt.cc, distinct=True)
|
|
self.assertEqual(len(result), 4)
|
|
result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].cc, 1)
|
|
result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa)
|
|
self.assertEqual(result[2].aa, '2')
|
|
self.assertEqual(result[1].aa, '3')
|
|
result = db().select(db.tt.aa, db.tt.bb,
|
|
distinct=True, orderby=(db.tt.aa|~db.tt.bb))
|
|
self.assertEqual(tuple(result.response[4]), ('2', 3))
|
|
result = db().select(db.tt.aa,
|
|
distinct=True, orderby=~db.tt.aa, limitby=(1,2))
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].aa, '3')
|
|
|
|
# test count distinct
|
|
db.tt.insert(aa='2', bb=3, cc=1)
|
|
self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4)
|
|
self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4)
|
|
self.assertEqual(db(db.tt.aa).count(), 11)
|
|
count=db.tt.aa.count()
|
|
self.assertEqual(db(db.tt).select(count).first()[count], 11)
|
|
|
|
count=db.tt.aa.count(distinct=True)
|
|
sum=db.tt.bb.sum()
|
|
result = db(db.tt).select(count, sum)
|
|
self.assertEqual(tuple(result.response[0]), (4, 23))
|
|
|
|
def testCoalesce(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'), Field('bb'), Field('cc'), Field('dd'))
|
|
db.tt.insert(aa='xx')
|
|
db.tt.insert(aa='xx', bb='yy')
|
|
db.tt.insert(aa='xx', bb='yy', cc='zz')
|
|
db.tt.insert(aa='xx', bb='yy', cc='zz', dd='')
|
|
result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa))
|
|
self.assertEqual(result.response[0][0], 'xx')
|
|
self.assertEqual(result.response[1][0], 'yy')
|
|
self.assertEqual(result.response[2][0], 'zz')
|
|
self.assertEqual(result.response[3][0], '')
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
|
|
db.tt.insert(bb='')
|
|
db.tt.insert(aa=1)
|
|
result = db(db.tt).select(db.tt.aa.coalesce_zero())
|
|
self.assertEqual(result.response[0][0], 0)
|
|
self.assertEqual(result.response[1][0], 1)
|
|
|
|
def testTableAliasCollisions(self):
|
|
db = self.connect()
|
|
db.define_table('t1', Field('aa'))
|
|
db.define_table('t2', Field('bb'))
|
|
t1, t2 = db.t1, db.t2
|
|
t1.with_alias('t2')
|
|
t2.with_alias('t1')
|
|
|
|
# Passing tables by name will result in exception
|
|
t1.insert(aa='test')
|
|
t2.insert(bb='foo')
|
|
db(t1.id > 0).update(aa='bar')
|
|
having = (t1.aa != None)
|
|
join = [t2.on(t1.aa == t2.bb)]
|
|
db(t1.aa == t2.bb).select(t1.aa, groupby=t1.aa, having=having,
|
|
orderby=t1.aa)
|
|
db(t1.aa).select(t1.aa, join=join, groupby=t1.aa, having=having,
|
|
orderby=t1.aa)
|
|
db(t1.aa).select(t1.aa, left=join, groupby=t1.aa, having=having,
|
|
orderby=t1.aa)
|
|
db(t1.id > 0).delete()
|
|
|
|
|
|
class TestSubselect(DALtest):
|
|
|
|
def testMethods(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
|
|
data = [
|
|
dict(aa=1, bb='foo'), dict(aa=1, bb='bar'), dict(aa=2, bb='foo')
|
|
]
|
|
for item in data:
|
|
db.tt.insert(**item)
|
|
fields = [db.tt.aa, db.tt.bb, db.tt.aa+2,
|
|
(db.tt.aa+1).with_alias('exp')]
|
|
sub = db(db.tt).nested_select(*fields, orderby=db.tt.id)
|
|
# Check the fields provided by the object
|
|
self.assertEqual(sorted(['aa', 'bb', 'exp']), sorted(list(sub.fields)))
|
|
for name in sub.fields:
|
|
self.assertIsInstance(sub[name], Field)
|
|
for item in sub:
|
|
self.assertIsInstance(item, Field)
|
|
self.assertEqual(len(list(sub)), len(sub.fields))
|
|
for key, val in zip(sub.fields, sub):
|
|
self.assertIs(sub[key], val)
|
|
self.assertIs(getattr(sub, key), val)
|
|
tmp = sub._filter_fields(dict(aa=1, exp=2, foo=3))
|
|
self.assertEqual(tmp, dict(aa=1, exp=2))
|
|
# Check result from executing the query
|
|
result = sub()
|
|
self.assertEqual(len(result), len(data))
|
|
for idx, row in enumerate(data):
|
|
self.assertEqual(result[idx]['tt'].as_dict(), row)
|
|
self.assertEqual(result[idx]['exp'], row['aa']+1)
|
|
result = db.executesql(str(sub))
|
|
for idx, row in enumerate(data):
|
|
tmp = [row['aa'], row['bb'], row['aa']+2, row['aa']+1]
|
|
self.assertEqual(list(result[idx]), tmp)
|
|
# Check that query expansion methods don't work without alias
|
|
self.assertEqual(sub._rname, None)
|
|
self.assertEqual(sub._raw_rname, None)
|
|
self.assertEqual(sub._dalname, None)
|
|
with self.assertRaises(SyntaxError):
|
|
sub.query_name()
|
|
with self.assertRaises(SyntaxError):
|
|
sub.sql_shortref
|
|
with self.assertRaises(SyntaxError):
|
|
sub.on(sub.aa != None)
|
|
# Alias checks
|
|
sub = sub.with_alias('foo')
|
|
result = sub()
|
|
for idx, row in enumerate(data):
|
|
self.assertEqual(result[idx]['tt'].as_dict(), row)
|
|
self.assertEqual(result[idx]['exp'], row['aa']+1)
|
|
# Check query expansion methods again
|
|
self.assertEqual(sub._rname, None)
|
|
self.assertEqual(sub._raw_rname, None)
|
|
self.assertEqual(sub._dalname, None)
|
|
self.assertEqual(sub.query_name()[0], str(sub))
|
|
self.assertEqual(sub.sql_shortref, db._adapter.dialect.quote('foo'))
|
|
self.assertIsInstance(sub.on(sub.aa != None), Expression)
|
|
|
|
def testSelectArguments(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
|
|
data = [
|
|
dict(aa=1, bb='foo'), dict(aa=1, bb='bar'), dict(aa=2, bb='foo'),
|
|
dict(aa=3, bb='foo'), dict(aa=3, bb='baz')
|
|
]
|
|
expected = [(1, None, 0), (2, 2, 2), (2, 2, 2), (3, 4, 3), (3, 8, 6)]
|
|
for item in data:
|
|
db.tt.insert(**item)
|
|
|
|
# Check that select clauses work as expected in stand-alone query
|
|
t1 = db.tt.with_alias('t1')
|
|
t2 = db.tt.with_alias('t2')
|
|
fields = [t1.aa, t2.aa.sum().with_alias('total'),
|
|
t2.aa.count().with_alias('cnt')]
|
|
join = t1.on(db.tt.bb != t1.bb)
|
|
left = t2.on(t1.aa > t2.aa)
|
|
group = db.tt.bb | t1.aa
|
|
having = db.tt.aa.count() > 1
|
|
order = t1.aa | t2.aa.count()
|
|
limit = (1,6)
|
|
sub = db(db.tt.aa != 2).nested_select(*fields, join=join, left=left,
|
|
orderby=order, groupby=group, having=having, limitby=limit)
|
|
result = sub()
|
|
self.assertEqual(len(result), len(expected))
|
|
for idx, val in enumerate(expected):
|
|
self.assertEqual(result[idx]['t1']['aa'], val[0])
|
|
self.assertEqual(result[idx]['total'], val[1])
|
|
self.assertEqual(result[idx]['cnt'], val[2])
|
|
|
|
# Check again when nested inside another query
|
|
# Also check that the alias will not conflict with existing table
|
|
t3 = db.tt.with_alias('t3')
|
|
sub = sub.with_alias('tt')
|
|
query = (t3.bb == 'foo') & (t3.aa == sub.aa)
|
|
order = t3.aa | sub.cnt
|
|
result = db(query).select(t3.aa, sub.total, sub.cnt, orderby=order)
|
|
for idx, val in enumerate(expected):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['tt']['total'], val[1])
|
|
self.assertEqual(result[idx]['tt']['cnt'], val[2])
|
|
|
|
# Check "distinct" modifier separately
|
|
sub = db(db.tt.aa != 2).nested_select(db.tt.aa, distinct=True)
|
|
result = sub().as_list()
|
|
self.assertEqual(result, [dict(aa=1), dict(aa=3)])
|
|
|
|
def testCorrelated(self):
|
|
db = self.connect()
|
|
db.define_table('t1', Field('aa', 'integer'), Field('bb'),
|
|
Field('mark', 'integer'))
|
|
db.define_table('t2', Field('aa', 'integer'), Field('cc'))
|
|
db.define_table('t3', Field('aa', 'integer'))
|
|
data_t1 = [
|
|
dict(aa=1, bb='bar'), dict(aa=1, bb='foo'), dict(aa=2, bb='foo'),
|
|
dict(aa=2, bb='test'), dict(aa=3, bb='baz'), dict(aa=3, bb='foo')
|
|
]
|
|
data_t2 = [
|
|
dict(aa=1, cc='foo'), dict(aa=2, cc='bar'), dict(aa=3, cc='baz')
|
|
]
|
|
expected_cor = [(1, 'foo'), (3, 'baz')]
|
|
expected_leftcor = [(1, 'foo'), (2, None), (3, 'baz')]
|
|
expected_uncor = [
|
|
(1, 'bar'), (1, 'foo'), (2, 'foo'), (3, 'baz'), (3, 'foo')
|
|
]
|
|
for item in data_t1:
|
|
db.t1.insert(**item)
|
|
for item in data_t2:
|
|
db.t2.insert(**item)
|
|
db.t3.insert(aa=item['aa'])
|
|
|
|
# Correlated subqueries
|
|
subquery = db.t1.aa == db.t2.aa
|
|
subfields = [db.t2.cc]
|
|
sub = db(subquery).nested_select(*subfields).with_alias('sub')
|
|
query = db.t1.bb.belongs(sub)
|
|
order = db.t1.aa | db.t1.bb
|
|
result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
|
|
self.assertEqual(len(result), len(expected_cor))
|
|
for idx, val in enumerate(expected_cor):
|
|
self.assertEqual(result[idx]['aa'], val[0])
|
|
self.assertEqual(result[idx]['bb'], val[1])
|
|
|
|
join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
|
|
order = db.t3.aa | db.t1.bb
|
|
result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
|
|
self.assertEqual(len(result), len(expected_cor))
|
|
for idx, val in enumerate(expected_cor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['t1']['bb'], val[1])
|
|
|
|
left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
|
|
result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
|
|
self.assertEqual(len(result), len(expected_leftcor))
|
|
for idx, val in enumerate(expected_leftcor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['t1']['bb'], val[1])
|
|
|
|
order = db.t1.aa | db.t1.bb
|
|
db(db.t1.bb.belongs(sub)).update(mark=1)
|
|
result = db(db.t1.mark == 1).select(db.t1.aa, db.t1.bb, orderby=order)
|
|
self.assertEqual(len(result), len(expected_cor))
|
|
for idx, val in enumerate(expected_cor):
|
|
self.assertEqual(result[idx]['aa'], val[0])
|
|
self.assertEqual(result[idx]['bb'], val[1])
|
|
|
|
db(~db.t1.bb.belongs(sub)).delete()
|
|
result = db(db.t1.id > 0).select(db.t1.aa, db.t1.bb, orderby=order)
|
|
self.assertEqual(len(result), len(expected_cor))
|
|
for idx, val in enumerate(expected_cor):
|
|
self.assertEqual(result[idx]['aa'], val[0])
|
|
self.assertEqual(result[idx]['bb'], val[1])
|
|
|
|
db(db.t1.id > 0).delete()
|
|
for item in data_t1:
|
|
db.t1.insert(**item)
|
|
|
|
# Uncorrelated subqueries
|
|
kwargs = dict(correlated=False)
|
|
sub = db(subquery).nested_select(*subfields, **kwargs)
|
|
query = db.t1.bb.belongs(sub)
|
|
order = db.t1.aa | db.t1.bb
|
|
result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['aa'], val[0])
|
|
self.assertEqual(result[idx]['bb'], val[1])
|
|
|
|
join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
|
|
order = db.t3.aa | db.t1.bb
|
|
result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['t1']['bb'], val[1])
|
|
|
|
left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
|
|
result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['t1']['bb'], val[1])
|
|
# MySQL does not support subqueries with uncorrelated references
|
|
# to target table
|
|
|
|
# Correlation prevented by alias in parent select
|
|
tmp = db.t1.with_alias('tmp')
|
|
sub = db(subquery).nested_select(*subfields)
|
|
query = tmp.bb.belongs(sub)
|
|
order = tmp.aa | tmp.bb
|
|
result = db(query).select(tmp.aa, tmp.bb, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['aa'], val[0])
|
|
self.assertEqual(result[idx]['bb'], val[1])
|
|
|
|
join = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
|
|
order = db.t3.aa | tmp.bb
|
|
result = db(db.t3).select(db.t3.aa, tmp.bb, join=join, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['tmp']['bb'], val[1])
|
|
|
|
left = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
|
|
result = db(db.t3).select(db.t3.aa, tmp.bb, left=left, orderby=order)
|
|
self.assertEqual(len(result), len(expected_uncor))
|
|
for idx, val in enumerate(expected_uncor):
|
|
self.assertEqual(result[idx]['t3']['aa'], val[0])
|
|
self.assertEqual(result[idx]['tmp']['bb'], val[1])
|
|
# SQLite does not support aliasing target table in UPDATE/DELETE
|
|
# MySQL does not support subqueries with uncorrelated references
|
|
# to target table
|
|
|
|
class TestAddMethod(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
@db.tt.add_method.all
|
|
def select_all(table,orderby=None):
|
|
return table._db(table).select(orderby=orderby)
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa='1'), 2)
|
|
self.assertEqual(db.tt.insert(aa='1'), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
self.assertEqual(len(db.tt.all()), 3)
|
|
|
|
|
|
class TestBelongs(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa='2'), 2)
|
|
self.assertEqual(db.tt.insert(aa='3'), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa='2'), 1)
|
|
self.assertEqual(db.tt.insert(aa='3'), 1)
|
|
self.assertEqual(db(db.tt.aa.belongs(('1', '3'))).count(),
|
|
2)
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.id
|
|
> 2)._select(db.tt.aa))).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(('1',
|
|
'3')))._select(db.tt.aa))).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(db
|
|
(db.tt.aa.belongs(('1', '3')))._select(db.tt.aa)))._select(
|
|
db.tt.aa))).count(),
|
|
2)
|
|
|
|
|
|
class TestContains(DALtest):
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'))
|
|
self.assertEqual(db.tt.insert(aa=['aaa','bbb'],bb='aaa'), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa=['bbb','ddd'],bb='abb'), 2)
|
|
self.assertEqual(db.tt.insert(aa=['eee','aaa'],bb='acc'), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa=['bbb','ddd'],bb='abb'), 1)
|
|
self.assertEqual(db.tt.insert(aa=['eee','aaa'],bb='acc'), 1)
|
|
self.assertEqual(db(db.tt.aa.contains('aaa')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.contains('bbb')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.contains('aa')).count(), 0)
|
|
self.assertEqual(db(db.tt.bb.contains('a')).count(), 3)
|
|
self.assertEqual(db(db.tt.bb.contains('b')).count(), 1)
|
|
self.assertEqual(db(db.tt.bb.contains('d')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
|
|
#case-sensitivity tests, if 1 it isn't
|
|
is_case_insensitive = db(db.tt.bb.like('%AA%')).count()
|
|
if is_case_insensitive:
|
|
self.assertEqual(db(db.tt.aa.contains('AAA')).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains('A')).count(), 3)
|
|
else:
|
|
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=True)).count(), 0)
|
|
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=True)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=False)).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=False)).count(), 3)
|
|
db.tt.drop()
|
|
|
|
# integers in string fields
|
|
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'), Field('cc','integer'))
|
|
self.assertEqual(db.tt.insert(aa=['123','456'],bb='123', cc=12), 1)
|
|
if not IS_TERADATA:
|
|
self.assertEqual(db.tt.insert(aa=['124','456'],bb='123', cc=123), 2)
|
|
self.assertEqual(db.tt.insert(aa=['125','457'],bb='23', cc=125), 3)
|
|
else:
|
|
self.assertEqual(db.tt.insert(aa=['124','456'],bb='123', cc=123), 1)
|
|
self.assertEqual(db.tt.insert(aa=['125','457'],bb='23', cc=125), 1)
|
|
self.assertEqual(db(db.tt.aa.contains(123)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.contains(23)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1)
|
|
self.assertEqual(db(db.tt.bb.contains(123)).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains(23)).count(), 3)
|
|
self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2)
|
|
db.tt.drop()
|
|
|
|
# string field contains string field
|
|
db.define_table('tt', Field('aa'), Field('bb'))
|
|
db.tt.insert(aa='aaa', bb='%aaa')
|
|
db.tt.insert(aa='aaa', bb='aaa')
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
|
|
db.tt.drop()
|
|
|
|
#escaping
|
|
db.define_table('tt', Field('aa'))
|
|
db.tt.insert(aa='perc%ent')
|
|
db.tt.insert(aa='percent')
|
|
db.tt.insert(aa='percxyzent')
|
|
db.tt.insert(aa='under_score')
|
|
db.tt.insert(aa='underxscore')
|
|
db.tt.insert(aa='underyscore')
|
|
self.assertEqual(db(db.tt.aa.contains('perc%ent')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.contains('under_score')).count(), 1)
|
|
|
|
|
|
class TestLike(DALtest):
|
|
|
|
def setUp(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='abc'), long), True)
|
|
self.db = db
|
|
|
|
def testRun(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.like('a%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%b%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%d%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('ab_')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('a_c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('_bc')).count(), 1)
|
|
|
|
self.assertEqual(db(db.tt.aa.like('A%', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%B%', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%C', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('A%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('%B%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('%C')).count(), 1)
|
|
|
|
#DAL maps like() (and contains(), startswith(), endswith())
|
|
#to the LIKE operator, that in ANSI-SQL is case-sensitive
|
|
#There are backends supporting case-sensitivity by default
|
|
#and backends that needs additional care to turn
|
|
#case-sensitivity on. To discern among those, let's run
|
|
#this query comparing previously inserted 'abc' with 'ABC':
|
|
#if the result is 0, then the backend recognizes
|
|
#case-sensitivity, if 1 it isn't
|
|
is_case_insensitive = db(db.tt.aa.like('%ABC%')).count()
|
|
self.assertEqual(db(db.tt.aa.like('A%')).count(), is_case_insensitive)
|
|
self.assertEqual(db(db.tt.aa.like('%B%')).count(), is_case_insensitive)
|
|
self.assertEqual(db(db.tt.aa.like('%C')).count(), is_case_insensitive)
|
|
|
|
def testUpperLower(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.upper().like('A%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.upper().like('%B%')).count(),1)
|
|
self.assertEqual(db(db.tt.aa.upper().like('%C')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.lower().like('%c')).count(), 1)
|
|
|
|
def testStartsEndsWith(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.startswith('a')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.endswith('c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.startswith('c')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.endswith('a')).count(), 0)
|
|
|
|
def testEscaping(self):
|
|
db = self.db
|
|
term = 'ahbc'.replace('h', '\\') #funny but to avoid any doubts...
|
|
db.tt.insert(aa='a%bc')
|
|
db.tt.insert(aa='a_bc')
|
|
db.tt.insert(aa=term)
|
|
self.assertEqual(db(db.tt.aa.like('%ax%bc%', escape='x')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%ax_bc%', escape='x')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%'+term+'%')).count(), 1)
|
|
db(db.tt.id>0).delete()
|
|
# test "literal" like, i.e. exactly as LIKE in the backend
|
|
db.tt.insert(aa='perc%ent')
|
|
db.tt.insert(aa='percent')
|
|
db.tt.insert(aa='percxyzent')
|
|
db.tt.insert(aa='under_score')
|
|
db.tt.insert(aa='underxscore')
|
|
db.tt.insert(aa='underyscore')
|
|
self.assertEqual(db(db.tt.aa.like('%perc%ent%')).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.like('%under_score%')).count(), 3)
|
|
db(db.tt.id>0).delete()
|
|
# escaping with startswith and endswith
|
|
db.tt.insert(aa='%percent')
|
|
db.tt.insert(aa='xpercent')
|
|
db.tt.insert(aa='discount%')
|
|
db.tt.insert(aa='discountx')
|
|
self.assertEqual(db(db.tt.aa.endswith('discount%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('discount%%')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.startswith('%percent')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%%percent')).count(), 2)
|
|
|
|
@unittest.skipIf(IS_MSSQL, "No Regexp on MSSQL")
|
|
def testRegexp(self):
|
|
db = self.db
|
|
db(db.tt.id>0).delete()
|
|
db.tt.insert(aa='%percent')
|
|
db.tt.insert(aa='xpercent')
|
|
db.tt.insert(aa='discount%')
|
|
db.tt.insert(aa='discountx')
|
|
try:
|
|
self.assertEqual(db(db.tt.aa.regexp('count')).count(), 2)
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
self.assertEqual(db(db.tt.aa.lower().regexp('count')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') &
|
|
db.tt.aa.lower().regexp('count')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') |
|
|
(db.tt.aa.lower()=='xpercent')).count(), 3)
|
|
|
|
def testLikeInteger(self):
|
|
db = self.db
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True)
|
|
self.assertEqual(db(db.tt.aa.like('1%')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.like('1_3%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('2%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('_2%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('12%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('012%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('%45%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%54%')).count(), 0)
|
|
|
|
class TestDatetime(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'datetime'))
|
|
self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 12, 21,
|
|
11, 30)), 1)
|
|
self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 11, 21,
|
|
10, 30)), 2)
|
|
self.assertEqual(db.tt.insert(aa=datetime.datetime(1970, 12, 21,
|
|
9, 31)), 3)
|
|
self.assertEqual(db(db.tt.aa == datetime.datetime(1971, 12,
|
|
21, 11, 30)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.month() > 11).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.epoch() < 365*24*3600).delete(), 1)
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'time'))
|
|
t0 = datetime.time(10, 30, 55)
|
|
db.tt.insert(aa=t0)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'date'))
|
|
t0 = datetime.date.today()
|
|
db.tt.insert(aa=t0)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
|
|
|
|
class TestExpressions(DALtest):
|
|
|
|
@unittest.skipIf(IS_POSTGRESQL, "PG8000 does not like these")
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'integer'),
|
|
Field('bb', 'integer'), Field('cc'))
|
|
self.assertEqual(db.tt.insert(aa=1, bb=0), 1)
|
|
self.assertEqual(db.tt.insert(aa=2, bb=0), 2)
|
|
self.assertEqual(db.tt.insert(aa=3, bb=0), 3)
|
|
|
|
# test update
|
|
self.assertEqual(db(db.tt.aa == 3).update(aa=db.tt.aa + 1,
|
|
bb=db.tt.bb + 2), 1)
|
|
self.assertEqual(db(db.tt.aa == 4).count(), 1)
|
|
self.assertEqual(db(db.tt.bb == 2).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == -2).count(), 0)
|
|
self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1)
|
|
self.assertEqual(db(db.tt.bb == 5).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2,
|
|
cc='cc'), 1)
|
|
self.assertEqual(db(db.tt.cc == 'cc').count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa *
|
|
(db.tt.bb - 3)), 1)
|
|
self.assertEqual(db(db.tt.bb == 12).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1,
|
|
cc=db.tt.cc + '1' +'1'), 1)
|
|
self.assertEqual(db(db.tt.cc == 'cc11').count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 3).count(), 1)
|
|
|
|
# test comparsion expression based count
|
|
self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0)
|
|
self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3)
|
|
|
|
# test select aggregations
|
|
sum = (db.tt.aa + 1).sum()
|
|
self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7)
|
|
self.assertEqual(db((1==0) & (db.tt.aa >= db.tt.aa)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None)
|
|
|
|
count=db.tt.aa.count()
|
|
avg=db.tt.aa.avg()
|
|
min=db.tt.aa.min()
|
|
max=db.tt.aa.max()
|
|
result = db(db.tt).select(sum, count, avg, min, max).first()
|
|
self.assertEqual(result[sum], 9)
|
|
self.assertEqual(result[count], 3)
|
|
self.assertEqual(result[avg], 2)
|
|
self.assertEqual(result[min], 1)
|
|
self.assertEqual(result[max], 3)
|
|
|
|
# Test basic expressions evaluated at python level
|
|
self.assertEqual(db((1==1) & (db.tt.aa >= 2)).count(), 2)
|
|
self.assertEqual(db((1==1) | (db.tt.aa >= 2)).count(), 3)
|
|
self.assertEqual(db((1==0) & (db.tt.aa >= 2)).count(), 0)
|
|
self.assertEqual(db((1==0) | (db.tt.aa >= 2)).count(), 2)
|
|
|
|
# test abs()
|
|
self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa*-10), 1)
|
|
abs=db.tt.aa.abs().with_alias('abs')
|
|
result = db(db.tt.aa == -20).select(abs).first()
|
|
self.assertEqual(result[abs], 20)
|
|
self.assertEqual(result['abs'], 20)
|
|
abs=db.tt.aa.abs()/10+5
|
|
exp=abs.min()*2+1
|
|
result = db(db.tt.aa == -20).select(exp).first()
|
|
self.assertEqual(result[exp], 15)
|
|
|
|
# test case()
|
|
condition = db.tt.aa > 2
|
|
case = condition.case(db.tt.aa + 2, db.tt.aa - 2)
|
|
my_case = case.with_alias('my_case')
|
|
result = db().select(my_case)
|
|
self.assertEqual(len(result), 3)
|
|
self.assertEqual(result[0][my_case], -1)
|
|
self.assertEqual(result[0]['my_case'], -1)
|
|
self.assertEqual(result[1]['my_case'], -22)
|
|
self.assertEqual(result[2]['my_case'], 5)
|
|
|
|
# test expression based delete
|
|
self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1)
|
|
self.assertEqual(db(db.tt.aa).count(), 2)
|
|
|
|
def testUpdate(self):
|
|
db = self.connect()
|
|
|
|
# some db's only support seconds
|
|
datetime_datetime_today = datetime.datetime.today()
|
|
datetime_datetime_today = datetime_datetime_today.replace(
|
|
microsecond = 0)
|
|
one_day = datetime.timedelta(1)
|
|
one_sec = datetime.timedelta(0,1)
|
|
|
|
update_vals = (
|
|
('string', 'x', 'y'),
|
|
('text', 'x', 'y'),
|
|
('password', 'x', 'y'),
|
|
('integer', 1, 2),
|
|
('bigint', 1, 2),
|
|
('float', 1.0, 2.0),
|
|
('double', 1.0, 2.0),
|
|
('boolean', True, False),
|
|
('date', datetime.date.today(), datetime.date.today() + one_day),
|
|
('datetime', datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
|
|
datetime_datetime_today),
|
|
('time', datetime_datetime_today.time(),
|
|
(datetime_datetime_today + one_sec).time()),
|
|
)
|
|
|
|
for uv in update_vals:
|
|
db.define_table('tt', Field('aa', 'integer', default=0),
|
|
Field('bb', uv[0]))
|
|
self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long))
|
|
self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1])
|
|
self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1)
|
|
self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2])
|
|
db.tt.drop()
|
|
|
|
def testSubstring(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('name'))
|
|
input_name = "web2py"
|
|
t0.insert(name=input_name)
|
|
exp_slice = t0.name.lower()[4:6]
|
|
exp_slice_no_max = t0.name.lower()[4:]
|
|
exp_slice_neg_max = t0.name.lower()[2:-2]
|
|
exp_slice_neg_start = t0.name.lower()[-2:]
|
|
exp_item = t0.name.lower()[3]
|
|
out = db(t0).select(exp_slice, exp_item, exp_slice_no_max, exp_slice_neg_max, exp_slice_neg_start).first()
|
|
self.assertEqual(out[exp_slice], input_name[4:6])
|
|
self.assertEqual(out[exp_item], input_name[3])
|
|
self.assertEqual(out[exp_slice_no_max], input_name[4:])
|
|
self.assertEqual(out[exp_slice_neg_max], input_name[2:-2])
|
|
self.assertEqual(out[exp_slice_neg_start], input_name[-2:])
|
|
|
|
def testOps(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('vv', 'integer'))
|
|
self.assertEqual(db.t0.insert(vv=1), 1)
|
|
self.assertEqual(db.t0.insert(vv=2), 2)
|
|
self.assertEqual(db.t0.insert(vv=3), 3)
|
|
sum = db.t0.vv.sum()
|
|
count=db.t0.vv.count()
|
|
avg=db.t0.vv.avg()
|
|
op = sum/count
|
|
op1 = (sum/count).with_alias('tot')
|
|
self.assertEqual(db(t0).select(op).first()[op], 2)
|
|
self.assertEqual(db(t0).select(op1).first()[op1], 2)
|
|
self.assertEqual(db(t0).select(op1).first()['tot'], 2)
|
|
op2 = avg*count
|
|
self.assertEqual(db(t0).select(op2).first()[op2], 6)
|
|
# the following is not possible at least on sqlite
|
|
sum = db.t0.vv.sum().with_alias('s')
|
|
count=db.t0.vv.count().with_alias('c')
|
|
op = sum/count
|
|
#self.assertEqual(db(t0).select(op).first()[op], 2)
|
|
|
|
|
|
class TestTableAliasing(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('t1', Field('aa'))
|
|
db.define_table('t2',
|
|
Field('pk', type='id', unique=True, notnull=True),
|
|
Field('bb', type='integer'), rname='tt')
|
|
tab1 = db.t1.with_alias('test1')
|
|
tab2 = db.t2.with_alias('test2')
|
|
self.assertIs(tab2.id, tab2.pk)
|
|
self.assertIs(tab2._id, tab2.pk)
|
|
self.assertEqual(tab1._dalname, 't1')
|
|
self.assertEqual(tab1._tablename, 'test1')
|
|
self.assertEqual(tab2._dalname, 't2')
|
|
self.assertEqual(tab2._tablename, 'test2')
|
|
self.assertEqual(tab2._rname, 'tt')
|
|
tab1.insert(aa='foo')
|
|
tab1.insert(aa='bar')
|
|
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
|
|
self.assertEqual(result.as_list(), [{'aa': 'bar'}, {'aa': 'foo'}])
|
|
|
|
if not IS_SQLITE:
|
|
db(tab1.aa == 'foo').update(aa='baz')
|
|
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
|
|
self.assertEqual(result.as_list(), [{'aa': 'bar'}, {'aa': 'baz'}])
|
|
db(tab1.aa == 'bar').delete()
|
|
result = db(tab1).select(tab1.aa, orderby=tab1.aa)
|
|
self.assertEqual(result.as_list(), [{'aa': 'baz'}])
|
|
else:
|
|
with self.assertRaises(SyntaxError):
|
|
db(tab1.aa == 'foo').update(aa='baz')
|
|
with self.assertRaises(SyntaxError):
|
|
db(tab1.aa == 'bar').delete()
|
|
|
|
tab2.insert(bb=123)
|
|
tab2.insert(bb=456)
|
|
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
|
|
self.assertEqual(result.as_list(), [{'bb': 123}, {'bb': 456}])
|
|
|
|
if not IS_SQLITE:
|
|
db(tab2.bb == 456).update(bb=789)
|
|
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
|
|
self.assertEqual(result.as_list(), [{'bb': 123}, {'bb': 789}])
|
|
db(tab2.bb == 123).delete()
|
|
result = db(tab2).select(tab2.bb, orderby=tab2.bb)
|
|
self.assertEqual(result.as_list(), [{'bb': 789}])
|
|
else:
|
|
with self.assertRaises(SyntaxError):
|
|
db(tab2.bb == 456).update(bb=789)
|
|
with self.assertRaises(SyntaxError):
|
|
db(tab2.bb == 123).delete()
|
|
|
|
class TestJoin(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('t1', Field('aa'))
|
|
db.define_table('t2', Field('aa'), Field('b', db.t1))
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
db.t2.drop()
|
|
db.t1.drop()
|
|
|
|
db.define_table('person',Field('name'))
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name'),Field('ownerperson','reference person'))
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
db.dog.drop()
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
|
|
|
|
class TestMinMaxSumAvg(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(db.tt.insert(aa=1), 1)
|
|
self.assertEqual(db.tt.insert(aa=2), 2)
|
|
self.assertEqual(db.tt.insert(aa=3), 3)
|
|
s = db.tt.aa.min()
|
|
self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1)
|
|
self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1)
|
|
self.assertEqual(db().select(s).first()[s], 1)
|
|
s = db.tt.aa.max()
|
|
self.assertEqual(db().select(s).first()[s], 3)
|
|
s = db.tt.aa.sum()
|
|
self.assertEqual(db().select(s).first()[s], 6)
|
|
s = db.tt.aa.count()
|
|
self.assertEqual(db().select(s).first()[s], 3)
|
|
s = db.tt.aa.avg()
|
|
self.assertEqual(db().select(s).first()[s], 2)
|
|
|
|
|
|
class TestMigrations(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('BB'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa'), Field('BB'),
|
|
migrate='.storage.rname', rname='foo')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), migrate='.storage.table')
|
|
db.define_table('t1', Field('aa'), migrate='.storage.rname',
|
|
rname='foo')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('b'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa'), Field('b'),
|
|
migrate='.storage.rname', rname='foo')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('b', 'text'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa'), Field('b', 'text'),
|
|
migrate='.storage.rname', rname='foo')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), migrate='.storage.table')
|
|
db.define_table('t1', Field('aa'), migrate='.storage.rname',
|
|
rname='foo')
|
|
db.tt.drop()
|
|
db.t1.drop()
|
|
db.commit()
|
|
db.close()
|
|
|
|
def testFieldRName(self):
|
|
def checkWrite(db, table, data):
|
|
rowid = table.insert(**data)
|
|
query = (table._id == rowid)
|
|
fields = [table[x] for x in data.keys()]
|
|
row = db(query).select(*fields).first()
|
|
self.assertIsNot(row, None)
|
|
self.assertEqual(row.as_dict(), data)
|
|
db(query).delete()
|
|
|
|
# Create tables
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'),
|
|
Field('BB', rname='fbb'), migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'),
|
|
Field('BB', rname='fbb'), migrate='.storage.rname', rname='foo')
|
|
data = dict(aa='aa1', BB='BB1')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
db.commit()
|
|
db.close()
|
|
|
|
# Drop field defined by CREATE TABLE
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'),
|
|
migrate='.storage.rname', rname='foo')
|
|
data = dict(aa='aa2')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
db.commit()
|
|
db.close()
|
|
|
|
# Add new field
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'), Field('b', rname='fb'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'), Field('b', rname='fb'),
|
|
migrate='.storage.rname', rname='foo')
|
|
data = dict(aa='aa3', b='b3')
|
|
integrity = dict(aa='data', b='integrity')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
db.tt.insert(**integrity)
|
|
db.t1.insert(**integrity)
|
|
db.commit()
|
|
db.close()
|
|
|
|
# Change field type
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'),
|
|
Field('b', 'text', rname='fb'), migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'),
|
|
Field('b', 'text', rname='fb'), migrate='.storage.rname',
|
|
rname='foo')
|
|
data = dict(aa='aa4', b='b4')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
|
|
self.assertIsNot(row, None)
|
|
self.assertEqual(row.as_dict(), integrity)
|
|
row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
|
|
self.assertIsNot(row2, None)
|
|
self.assertEqual(row2.as_dict(), integrity)
|
|
db.commit()
|
|
db.close()
|
|
|
|
# Change field rname
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'),
|
|
Field('b', 'text', rname='xb'), migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'),
|
|
Field('b', 'text', rname='xb'), migrate='.storage.rname',
|
|
rname='foo')
|
|
data = dict(aa='aa4', b='b4')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
|
|
self.assertIsNot(row, None)
|
|
self.assertEqual(row.as_dict(), integrity)
|
|
row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
|
|
self.assertIsNot(row2, None)
|
|
self.assertEqual(row2.as_dict(), integrity)
|
|
db.commit()
|
|
db.close()
|
|
|
|
# Drop field defined by ALTER TABLE
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', rname='faa'),
|
|
migrate='.storage.table')
|
|
db.define_table('t1', Field('aa', rname='faa'),
|
|
migrate='.storage.rname', rname='foo')
|
|
data = dict(aa='aa5')
|
|
checkWrite(db, db.tt, data)
|
|
checkWrite(db, db.t1, data)
|
|
db.tt.drop()
|
|
db.t1.drop()
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
def tearDown(self):
|
|
if os.path.exists('.storage.db'):
|
|
os.unlink('.storage.db')
|
|
if os.path.exists('.storage.table'):
|
|
os.unlink('.storage.table')
|
|
if os.path.exists('.storage.rname'):
|
|
os.unlink('.storage.rname')
|
|
|
|
|
|
class TestReference(DALtest):
|
|
|
|
def testRun(self):
|
|
scenarios = (
|
|
(True, 'CASCADE'),
|
|
(False, 'CASCADE'),
|
|
(False, 'SET NULL'),
|
|
)
|
|
for (b, ondelete) in scenarios:
|
|
db = self.connect(bigint_id=b)
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#multiple cascade gotcha
|
|
for key in ['reference','reference FK']:
|
|
db._adapter.types[key]=db._adapter.types[key].replace(
|
|
'%(on_delete_action)s','NO ACTION')
|
|
db.define_table('tt', Field('name'),
|
|
Field('aa','reference tt',ondelete=ondelete))
|
|
db.commit()
|
|
x = db.tt.insert(name='xxx')
|
|
self.assertEqual(x.id, 1)
|
|
self.assertEqual(x['id'], 1)
|
|
x.aa = x
|
|
self.assertEqual(x.aa, 1)
|
|
x.update_record()
|
|
x1 = db.tt[1]
|
|
self.assertEqual(x1.aa, 1)
|
|
self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, 'xxx')
|
|
y=db.tt.insert(name='yyy', aa = x1)
|
|
self.assertEqual(y.aa, x1.id)
|
|
|
|
if not DEFAULT_URI.startswith('mssql'):
|
|
self.assertEqual(db.tt.insert(name='zzz'), 3)
|
|
self.assertEqual(db(db.tt.name).count(), 3)
|
|
db(db.tt.id == x).delete()
|
|
expected_count = {
|
|
'SET NULL': 2,
|
|
'NO ACTION': 2,
|
|
'CASCADE': 1,
|
|
}
|
|
self.assertEqual(db(db.tt.name).count(), expected_count[ondelete])
|
|
if ondelete == 'SET NULL':
|
|
self.assertEqual(db(db.tt.name == 'yyy').select()[0].aa, None)
|
|
|
|
self.tearDown()
|
|
|
|
|
|
class TestClientLevelOps(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table(
|
|
'tt',
|
|
Field('aa', represent=lambda x,r:'x'+x),
|
|
Field('bb', type='integer', represent=lambda x,r:'y'+str(x)))
|
|
db.commit()
|
|
db.tt.insert(aa="test", bb=1)
|
|
rows1 = db(db.tt.id<0).select()
|
|
rows2 = db(db.tt.id>0).select()
|
|
self.assertNotEqual(rows1, rows2)
|
|
rows1 = db(db.tt.id>0).select()
|
|
rows2 = db(db.tt.id>0).select()
|
|
self.assertEqual(rows1, rows2)
|
|
rows3 = rows1 + rows2
|
|
self.assertEqual(len(rows3), 2)
|
|
rows4 = rows1 | rows2
|
|
self.assertEqual(len(rows4), 1)
|
|
rows5 = rows1 & rows2
|
|
self.assertEqual(len(rows5), 1)
|
|
rows6 = rows1.find(lambda row: row.aa=="test")
|
|
self.assertEqual(len(rows6), 1)
|
|
rows7 = rows2.exclude(lambda row: row.aa=="test")
|
|
self.assertEqual(len(rows7), 1)
|
|
rows8 = rows5.sort(lambda row: row.aa)
|
|
self.assertEqual(len(rows8), 1)
|
|
def represent(f, v, r):
|
|
return 'z' + str(v)
|
|
|
|
db.representers = {
|
|
'rows_render': represent,
|
|
}
|
|
db.tt.insert(aa="foo", bb=2)
|
|
rows = db(db.tt.id>0).select()
|
|
exp1 = [Row(aa='ztest', bb='z1', id=rows[0]['id']),
|
|
Row(aa='zfoo', bb='z2', id=rows[1]['id'])]
|
|
exp2 = [Row(aa='ztest', bb=1, id=rows[0]['id']),
|
|
Row(aa='zfoo', bb=2, id=rows[1]['id'])]
|
|
exp3 = [Row(aa='test', bb='z1', id=rows[0]['id']),
|
|
Row(aa='foo', bb='z2', id=rows[1]['id'])]
|
|
self.assertEqual(rows.render(i=0), exp1[0])
|
|
self.assertEqual(rows.render(i=0, fields=[db.tt.aa, db.tt.bb]),
|
|
exp1[0])
|
|
self.assertEqual(rows.render(i=0, fields=[db.tt.aa]), exp2[0])
|
|
self.assertEqual(rows.render(i=0, fields=[db.tt.bb]), exp3[0])
|
|
self.assertEqual(list(rows.render()), exp1)
|
|
self.assertEqual(list(rows.render(fields=[db.tt.aa, db.tt.bb])), exp1)
|
|
self.assertEqual(list(rows.render(fields=[db.tt.aa])), exp2)
|
|
self.assertEqual(list(rows.render(fields=[db.tt.bb])), exp3)
|
|
ret = rows.render(i=0)
|
|
rows = db(db.tt.id>0).select()
|
|
rows.compact=False
|
|
row = rows[0]
|
|
self.assertIn('tt', row)
|
|
self.assertIn('id', row.tt)
|
|
self.assertNotIn('id', row)
|
|
rows.compact=True
|
|
row = rows[0]
|
|
self.assertNotIn('tt', row)
|
|
self.assertIn('id', row)
|
|
|
|
rows = db(db.tt.id>0).select(db.tt.id.max())
|
|
rows.compact=False
|
|
row = rows[0]
|
|
self.assertNotIn('tt', row)
|
|
self.assertIn('_extra', row)
|
|
|
|
rows = db(db.tt.id>0).select(db.tt.id.max())
|
|
rows.compact=True
|
|
row = rows[0]
|
|
self.assertNotIn('tt', row)
|
|
self.assertIn('_extra', row)
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa'), Field.Virtual('bb', lambda row: ':p'))
|
|
db.tt.insert(aa="test")
|
|
rows = db(db.tt.id>0).select()
|
|
row = rows.first()
|
|
self.assertNotIn('tt', row)
|
|
self.assertIn('id', row)
|
|
self.assertIn('bb', row)
|
|
|
|
rows.compact = False
|
|
row = rows.first()
|
|
self.assertIn('tt', row)
|
|
self.assertEqual(len(row.keys()), 1)
|
|
self.assertIn('id', row.tt)
|
|
self.assertIn('bb', row.tt)
|
|
self.assertNotIn('id', row)
|
|
self.assertNotIn('bb', row)
|
|
|
|
|
|
class TestVirtualFields(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('aa'))
|
|
db.commit()
|
|
db.tt.insert(aa="test")
|
|
class Compute:
|
|
def a_upper(row): return row.tt.aa.upper()
|
|
db.tt.virtualfields.append(Compute())
|
|
assert db(db.tt.id>0).select().first().a_upper == 'TEST'
|
|
|
|
|
|
class TestComputedFields(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb',default='x'),
|
|
Field('cc',compute=lambda r: r.aa+r.bb))
|
|
db.commit()
|
|
id = db.tt.insert(aa="z")
|
|
self.assertEqual(db.tt[id].cc,'zx')
|
|
db.tt.drop()
|
|
db.commit()
|
|
|
|
# test checking that a compute field can refer to earlier-defined computed fields
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb',default='x'),
|
|
Field('cc',compute=lambda r: r.aa+r.bb),
|
|
Field('dd',compute=lambda r: r.bb + r.cc))
|
|
db.commit()
|
|
id = db.tt.insert(aa="z")
|
|
self.assertEqual(db.tt[id].dd,'xzx')
|
|
|
|
|
|
class TestCommonFilters(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('t1', Field('aa', 'integer'))
|
|
db.define_table('t2', Field('aa', 'integer'), Field('b', db.t1))
|
|
i1 = db.t1.insert(aa=1)
|
|
i2 = db.t1.insert(aa=2)
|
|
i3 = db.t1.insert(aa=3)
|
|
db.t2.insert(aa=4, b=i1)
|
|
db.t2.insert(aa=5, b=i2)
|
|
db.t2.insert(aa=6, b=i2)
|
|
db.t1._common_filter = lambda q: db.t1.aa>1
|
|
self.assertEqual(db(db.t1).count(),2)
|
|
self.assertEqual(db(db.t1).count(),2)
|
|
q = db.t2.b==db.t1.id
|
|
self.assertEqual(db(q).count(),2)
|
|
self.assertEqual(db(q).count(),2)
|
|
self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))),3)
|
|
db.t2._common_filter = lambda q: db.t2.aa<6
|
|
self.assertEqual(db(q).count(),1)
|
|
self.assertEqual(db(q).count(),1)
|
|
self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))),2)
|
|
# test delete
|
|
self.assertEqual(db(db.t2).count(),2)
|
|
db(db.t2).delete()
|
|
self.assertEqual(db(db.t2).count(),0)
|
|
db.t2._common_filter = None
|
|
self.assertEqual(db(db.t2).count(),1)
|
|
# test update
|
|
db.t2.insert(aa=4, b=i1)
|
|
db.t2.insert(aa=5, b=i2)
|
|
db.t2._common_filter = lambda q: db.t2.aa<6
|
|
self.assertEqual(db(db.t2).count(),2)
|
|
db(db.t2).update(aa=6)
|
|
self.assertEqual(db(db.t2).count(),0)
|
|
db.t2._common_filter = None
|
|
self.assertEqual(db(db.t2).count(),3)
|
|
|
|
|
|
class TestImportExportFields(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('person', Field('name'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
for n in range(2):
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
for k in range(10):
|
|
id = db.person.insert(name=str(k))
|
|
db.pet.insert(friend=id,name=str(k))
|
|
db.commit()
|
|
stream = StringIO()
|
|
db.export_to_csv_file(stream)
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
stream = StringIO(stream.getvalue())
|
|
db.import_from_csv_file(stream)
|
|
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10
|
|
|
|
|
|
class TestImportExportUuidFields(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('person', Field('name'),Field('uuid'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
for n in range(2):
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
for k in range(10):
|
|
id = db.person.insert(name=str(k),uuid=str(k))
|
|
db.pet.insert(friend=id,name=str(k))
|
|
db.commit()
|
|
stream = StringIO()
|
|
db.export_to_csv_file(stream)
|
|
stream = StringIO(stream.getvalue())
|
|
db.import_from_csv_file(stream)
|
|
assert db(db.person).count()==10
|
|
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20
|
|
|
|
|
|
class TestDALDictImportExport(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('person', Field('name', default="Michael"),Field('uuid'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
dbdict = db.as_dict(flat=True, sanitize=False)
|
|
assert isinstance(dbdict, dict)
|
|
uri = dbdict["uri"]
|
|
assert isinstance(uri, basestring) and uri
|
|
assert len(dbdict["tables"]) == 2
|
|
assert len(dbdict["tables"][0]["fields"]) == 3
|
|
assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
|
|
assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
|
|
|
|
db2 = DAL(**dbdict)
|
|
assert len(db.tables) == len(db2.tables)
|
|
assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
|
|
assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
|
|
db.pet.drop()
|
|
db.commit()
|
|
|
|
db2.commit()
|
|
|
|
dbjson = db.as_json(sanitize=False)
|
|
assert isinstance(dbjson, basestring) and len(dbjson) > 0
|
|
db3 = DAL(**json.loads(dbjson))
|
|
assert hasattr(db3, "person") and hasattr(db3.person, "uuid")
|
|
assert db3.person.uuid.type == db.person.uuid.type
|
|
db3.person.drop()
|
|
db3.commit()
|
|
db3.close()
|
|
|
|
mpfc = "Monty Python's Flying Circus"
|
|
dbdict4 = {"uri": DEFAULT_URI,
|
|
"tables":[{"tablename": "tvshow",
|
|
"fields": [{"fieldname": "name",
|
|
"default":mpfc},
|
|
{"fieldname": "rating",
|
|
"type":"double"}]},
|
|
{"tablename": "staff",
|
|
"fields": [{"fieldname": "name",
|
|
"default":"Michael"},
|
|
{"fieldname": "food",
|
|
"default":"Spam"},
|
|
{"fieldname": "tvshow",
|
|
"type": "reference tvshow"}]}]}
|
|
db4 = DAL(**dbdict4)
|
|
assert "staff" in db4.tables
|
|
assert "name" in db4.staff
|
|
assert db4.tvshow.rating.type == "double"
|
|
assert (db4.tvshow.insert(), db4.tvshow.insert(name="Loriot"),
|
|
db4.tvshow.insert(name="Il Mattatore")) == (1, 2, 3)
|
|
assert db4(db4.tvshow).select().first().id == 1
|
|
assert db4(db4.tvshow).select().first().name == mpfc
|
|
|
|
db4.staff.drop()
|
|
db4.tvshow.drop()
|
|
db4.commit()
|
|
|
|
dbdict5 = {"uri": DEFAULT_URI}
|
|
db5 = DAL(**dbdict5)
|
|
assert db5.tables in ([], None)
|
|
assert not (str(db5) in ("", None))
|
|
|
|
dbdict6 = {"uri": DEFAULT_URI,
|
|
"tables":[{"tablename": "staff"},
|
|
{"tablename": "tvshow",
|
|
"fields": [{"fieldname": "name"},
|
|
{"fieldname": "rating", "type":"double"}
|
|
]
|
|
}]
|
|
}
|
|
db6 = DAL(**dbdict6)
|
|
|
|
assert len(db6["staff"].fields) == 1
|
|
assert "name" in db6["tvshow"].fields
|
|
|
|
assert db6.staff.insert() is not None
|
|
assert db6(db6.staff).select().first().id == 1
|
|
|
|
|
|
db6.staff.drop()
|
|
db6.tvshow.drop()
|
|
db6.commit()
|
|
|
|
db.close()
|
|
db2.close()
|
|
db4.close()
|
|
db5.close()
|
|
db6.close()
|
|
|
|
|
|
class TestSelectAsDict(DALtest):
|
|
|
|
def testSelect(self):
|
|
db = self.connect()
|
|
db.define_table(
|
|
'a_table',
|
|
Field('b_field'),
|
|
Field('a_field'),
|
|
)
|
|
db.a_table.insert(a_field="aa1", b_field="bb1")
|
|
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True)
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True)
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
self.assertEqual(list(rtn[0].keys()), ['id', 'b_field', 'a_field'])
|
|
|
|
|
|
class TestExecuteSQL(DALtest):
|
|
|
|
def testSelect(self):
|
|
db = self.connect(DEFAULT_URI, entity_quoting=False)
|
|
db.define_table(
|
|
'a_table',
|
|
Field('b_field'),
|
|
Field('a_field'),
|
|
)
|
|
db.a_table.insert(a_field="aa1", b_field="bb1")
|
|
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True)
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True)
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
self.assertEqual(list(rtn[0].keys()), ['id', 'b_field', 'a_field'])
|
|
|
|
rtn = db.executesql("select id, b_field, a_field from a_table", fields=db.a_table)
|
|
self.assertTrue(all(x in rtn[0].keys() for x in ['id', 'b_field', 'a_field']))
|
|
self.assertEqual(rtn[0].b_field, 'bb1')
|
|
|
|
rtn = db.executesql("select id, b_field, a_field from a_table", fields=db.a_table,
|
|
colnames=['a_table.id', 'a_table.b_field', 'a_table.a_field'])
|
|
|
|
self.assertTrue(all(x in rtn[0].keys() for x in ['id', 'b_field', 'a_field']))
|
|
self.assertEqual(rtn[0].b_field, 'bb1')
|
|
rtn = db.executesql("select COUNT(*) from a_table", fields=[db.a_table.id.count()], colnames=['foo'])
|
|
self.assertEqual(rtn[0].foo, 1)
|
|
|
|
class TestRNameTable(DALtest):
|
|
#tests for highly experimental rname attribute
|
|
|
|
def testSelect(self):
|
|
db = self.connect()
|
|
rname = 'a_very_complicated_tablename'
|
|
db.define_table(
|
|
'easy_name',
|
|
Field('a_field'),
|
|
rname=rname
|
|
)
|
|
rtn = db.easy_name.insert(a_field='a')
|
|
self.assertEqual(rtn.id, 1)
|
|
rtn = db(db.easy_name.a_field == 'a').select()
|
|
self.assertEqual(len(rtn), 1)
|
|
self.assertEqual(rtn[0].id, 1)
|
|
self.assertEqual(rtn[0].a_field, 'a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).delete()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 0)
|
|
db.easy_name.insert(a_field='a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
|
|
rtn = db(db.easy_name.a_field == 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
rtn = db(db.easy_name.a_field != 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
avg = db.easy_name.id.avg()
|
|
rtn = db(db.easy_name.id > 0).select(avg)
|
|
self.assertEqual(rtn[0][avg], 3)
|
|
rname = 'this_is_the_person_table'
|
|
db.define_table(
|
|
'person',
|
|
Field('name', default="Michael"),
|
|
Field('uuid'),
|
|
rname=rname
|
|
)
|
|
rname = 'this_is_the_pet_table'
|
|
db.define_table(
|
|
'pet',
|
|
Field('friend','reference person'),
|
|
Field('name'),
|
|
rname=rname
|
|
)
|
|
michael = db.person.insert() #default insert
|
|
john = db.person.insert(name='John')
|
|
luke = db.person.insert(name='Luke')
|
|
|
|
#michael owns Phippo
|
|
phippo = db.pet.insert(friend=michael, name="Phippo")
|
|
#john owns Dunstin and Gertie
|
|
dunstin = db.pet.insert(friend=john, name="Dunstin")
|
|
gertie = db.pet.insert(friend=john, name="Gertie")
|
|
|
|
rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id)
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[1].person.id, john)
|
|
self.assertEqual(rtn[1].person.name, 'John')
|
|
self.assertEqual(rtn[1].pet.name, 'Dunstin')
|
|
self.assertEqual(rtn[2].pet.name, 'Gertie')
|
|
#fetch owners, eventually with pet
|
|
#main point is retrieving Luke with no pets
|
|
rtn = db(db.person.id > 0).select(
|
|
orderby=db.person.id|db.pet.id,
|
|
left=db.pet.on(db.person.id == db.pet.friend)
|
|
)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[3].person.name, 'Luke')
|
|
self.assertEqual(rtn[3].person.id, luke)
|
|
self.assertEqual(rtn[3].pet.name, None)
|
|
#lets test a subquery
|
|
subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
|
|
rtn = db(db.person.id.belongs(subq)).select()
|
|
self.assertEqual(rtn[0].id, 2)
|
|
self.assertEqual(rtn[0]('person.name'), 'John')
|
|
#as dict
|
|
rtn = db(db.person.id > 0).select().as_dict()
|
|
self.assertEqual(rtn[1]['name'], 'Michael')
|
|
#as list
|
|
rtn = db(db.person.id > 0).select().as_list()
|
|
self.assertEqual(rtn[0]['name'], 'Michael')
|
|
#isempty
|
|
rtn = db(db.person.id > 0).isempty()
|
|
self.assertEqual(rtn, False)
|
|
#join argument
|
|
rtn = db(db.person).select(orderby=db.person.id|db.pet.id,
|
|
join=db.pet.on(db.person.id==db.pet.friend))
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[1].person.id, john)
|
|
self.assertEqual(rtn[1].person.name, 'John')
|
|
self.assertEqual(rtn[1].pet.name, 'Dunstin')
|
|
self.assertEqual(rtn[2].pet.name, 'Gertie')
|
|
|
|
#aliases
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#multiple cascade gotcha
|
|
for key in ['reference','reference FK']:
|
|
db._adapter.types[key]=db._adapter.types[key].replace(
|
|
'%(on_delete_action)s','NO ACTION')
|
|
rname = 'the_cubs'
|
|
db.define_table('pet_farm',
|
|
Field('name'),
|
|
Field('father','reference pet_farm'),
|
|
Field('mother','reference pet_farm'),
|
|
rname=rname
|
|
)
|
|
|
|
minali = db.pet_farm.insert(name='Minali')
|
|
osbert = db.pet_farm.insert(name='Osbert')
|
|
#they had a cub
|
|
selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali)
|
|
|
|
father = db.pet_farm.with_alias('father')
|
|
mother = db.pet_farm.with_alias('mother')
|
|
|
|
#fetch pets with relatives
|
|
rtn = db().select(
|
|
db.pet_farm.name, father.name, mother.name,
|
|
left=[
|
|
father.on(father.id == db.pet_farm.father),
|
|
mother.on(mother.id == db.pet_farm.mother)
|
|
],
|
|
orderby=db.pet_farm.id
|
|
)
|
|
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].pet_farm.name, 'Minali')
|
|
self.assertEqual(rtn[0].father.name, None)
|
|
self.assertEqual(rtn[0].mother.name, None)
|
|
self.assertEqual(rtn[1].pet_farm.name, 'Osbert')
|
|
self.assertEqual(rtn[2].pet_farm.name, 'Selina')
|
|
self.assertEqual(rtn[2].father.name, 'Osbert')
|
|
self.assertEqual(rtn[2].mother.name, 'Minali')
|
|
|
|
def testJoin(self):
|
|
db = self.connect()
|
|
rname = 'this_is_table_t1'
|
|
rname2 = 'this_is_table_t2'
|
|
db.define_table('t1', Field('aa'), rname=rname)
|
|
db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2)
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
db.t2.drop()
|
|
db.t1.drop()
|
|
|
|
db.define_table('person',Field('name'), rname=rname)
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name'),Field('ownerperson','reference person'), rname=rname2)
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
db.dog.drop()
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
|
|
|
|
class TestRNameFields(DALtest):
|
|
# tests for highly experimental rname attribute
|
|
def testSelect(self):
|
|
db = self.connect()
|
|
rname = 'a_very_complicated_fieldname'
|
|
rname2 = 'rrating_from_1_to_10'
|
|
db.define_table(
|
|
'easy_name',
|
|
Field('a_field', rname=rname),
|
|
Field('rating', 'integer', rname=rname2, default=2)
|
|
)
|
|
rtn = db.easy_name.insert(a_field='a')
|
|
self.assertEqual(rtn.id, 1)
|
|
rtn = db(db.easy_name.a_field == 'a').select()
|
|
self.assertEqual(len(rtn), 1)
|
|
self.assertEqual(rtn[0].id, 1)
|
|
self.assertEqual(rtn[0].a_field, 'a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).delete()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 0)
|
|
db.easy_name.insert(a_field='a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
|
|
rtn = db(db.easy_name.a_field == 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
rtn = db(db.easy_name.a_field != 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
avg = db.easy_name.id.avg()
|
|
rtn = db(db.easy_name.id > 0).select(avg)
|
|
self.assertEqual(rtn[0][avg], 3)
|
|
|
|
avg = db.easy_name.rating.avg()
|
|
rtn = db(db.easy_name.id > 0).select(avg)
|
|
self.assertEqual(rtn[0][avg], 2)
|
|
|
|
rname = 'this_is_the_person_name'
|
|
db.define_table(
|
|
'person',
|
|
Field('id', type='id', rname='fooid'),
|
|
Field('name', default="Michael", rname=rname),
|
|
Field('uuid')
|
|
)
|
|
rname = 'this_is_the_pet_name'
|
|
db.define_table(
|
|
'pet',
|
|
Field('friend','reference person'),
|
|
Field('name', rname=rname)
|
|
)
|
|
michael = db.person.insert() #default insert
|
|
john = db.person.insert(name='John')
|
|
luke = db.person.insert(name='Luke')
|
|
|
|
#michael owns Phippo
|
|
phippo = db.pet.insert(friend=michael, name="Phippo")
|
|
#john owns Dunstin and Gertie
|
|
dunstin = db.pet.insert(friend=john, name="Dunstin")
|
|
gertie = db.pet.insert(friend=john, name="Gertie")
|
|
|
|
rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id)
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[1].person.id, john)
|
|
self.assertEqual(rtn[1].person.name, 'John')
|
|
self.assertEqual(rtn[1].pet.name, 'Dunstin')
|
|
self.assertEqual(rtn[2].pet.name, 'Gertie')
|
|
#fetch owners, eventually with pet
|
|
#main point is retrieving Luke with no pets
|
|
rtn = db(db.person.id > 0).select(
|
|
orderby=db.person.id|db.pet.id,
|
|
left=db.pet.on(db.person.id == db.pet.friend)
|
|
)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[3].person.name, 'Luke')
|
|
self.assertEqual(rtn[3].person.id, luke)
|
|
self.assertEqual(rtn[3].pet.name, None)
|
|
#lets test a subquery
|
|
subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
|
|
rtn = db(db.person.id.belongs(subq)).select()
|
|
self.assertEqual(rtn[0].id, 2)
|
|
self.assertEqual(rtn[0]('person.name'), 'John')
|
|
#as dict
|
|
rtn = db(db.person.id > 0).select().as_dict()
|
|
self.assertEqual(rtn[1]['name'], 'Michael')
|
|
#as list
|
|
rtn = db(db.person.id > 0).select().as_list()
|
|
self.assertEqual(rtn[0]['name'], 'Michael')
|
|
#isempty
|
|
rtn = db(db.person.id > 0).isempty()
|
|
self.assertEqual(rtn, False)
|
|
#join argument
|
|
rtn = db(db.person).select(orderby=db.person.id|db.pet.id,
|
|
join=db.pet.on(db.person.id==db.pet.friend))
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].person.id, michael)
|
|
self.assertEqual(rtn[0].person.name, 'Michael')
|
|
self.assertEqual(rtn[0].pet.id, phippo)
|
|
self.assertEqual(rtn[0].pet.name, 'Phippo')
|
|
self.assertEqual(rtn[1].person.id, john)
|
|
self.assertEqual(rtn[1].person.name, 'John')
|
|
self.assertEqual(rtn[1].pet.name, 'Dunstin')
|
|
self.assertEqual(rtn[2].pet.name, 'Gertie')
|
|
|
|
#aliases
|
|
rname = 'the_cub_name'
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#multiple cascade gotcha
|
|
for key in ['reference','reference FK']:
|
|
db._adapter.types[key]=db._adapter.types[key].replace(
|
|
'%(on_delete_action)s','NO ACTION')
|
|
db.define_table('pet_farm',
|
|
Field('name', rname=rname),
|
|
Field('father','reference pet_farm'),
|
|
Field('mother','reference pet_farm'),
|
|
)
|
|
|
|
minali = db.pet_farm.insert(name='Minali')
|
|
osbert = db.pet_farm.insert(name='Osbert')
|
|
#they had a cub
|
|
selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali)
|
|
|
|
father = db.pet_farm.with_alias('father')
|
|
mother = db.pet_farm.with_alias('mother')
|
|
|
|
#fetch pets with relatives
|
|
rtn = db().select(
|
|
db.pet_farm.name, father.name, mother.name,
|
|
left=[
|
|
father.on(father.id == db.pet_farm.father),
|
|
mother.on(mother.id == db.pet_farm.mother)
|
|
],
|
|
orderby=db.pet_farm.id
|
|
)
|
|
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].pet_farm.name, 'Minali')
|
|
self.assertEqual(rtn[0].father.name, None)
|
|
self.assertEqual(rtn[0].mother.name, None)
|
|
self.assertEqual(rtn[1].pet_farm.name, 'Osbert')
|
|
self.assertEqual(rtn[2].pet_farm.name, 'Selina')
|
|
self.assertEqual(rtn[2].father.name, 'Osbert')
|
|
self.assertEqual(rtn[2].mother.name, 'Minali')
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
rname = 'a_very_complicated_fieldname'
|
|
for ft in ['string', 'text', 'password', 'upload', 'blob']:
|
|
db.define_table('tt', Field('aa', ft, default='', rname=rname))
|
|
self.assertEqual(db.tt.insert(aa='x'), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 'x')
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'integer', default=1, rname=rname))
|
|
self.assertEqual(db.tt.insert(aa=3), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'double', default=1, rname=rname))
|
|
self.assertEqual(db.tt.insert(aa=3.1), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'boolean', default=True, rname=rname))
|
|
self.assertEqual(db.tt.insert(aa=True), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'json', default={}, rname=rname))
|
|
self.assertEqual(db.tt.insert(aa={}), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, {})
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'date',
|
|
default=datetime.date.today(), rname=rname))
|
|
t0 = datetime.date.today()
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'datetime',
|
|
default=datetime.datetime.today(), rname=rname))
|
|
t0 = datetime.datetime(
|
|
1971,
|
|
12,
|
|
21,
|
|
10,
|
|
30,
|
|
55,
|
|
0,
|
|
)
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
|
|
## Row APIs
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(db.tt[1].aa,t0)
|
|
self.assertEqual(db.tt['aa'],db.tt.aa)
|
|
self.assertEqual(db.tt(1).aa,t0)
|
|
self.assertTrue(db.tt(1,aa=None)==None)
|
|
self.assertFalse(db.tt(1,aa=t0)==None)
|
|
self.assertEqual(row.aa,t0)
|
|
self.assertEqual(row['aa'],t0)
|
|
self.assertEqual(row['tt.aa'],t0)
|
|
self.assertEqual(row('tt.aa'),t0)
|
|
self.assertTrue('aa' in row)
|
|
self.assertTrue('pydal' not in row)
|
|
self.assertTrue(hasattr(row, 'aa'))
|
|
self.assertFalse(hasattr(row, 'pydal'))
|
|
|
|
## Lazy and Virtual fields
|
|
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
|
|
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(row.b,t0)
|
|
self.assertEqual(row.c(),t0)
|
|
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'time', default='11:30', rname=rname))
|
|
t0 = datetime.time(10, 30, 55)
|
|
self.assertEqual(db.tt.insert(aa=t0), 1)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
|
|
def testInsert(self):
|
|
db = self.connect()
|
|
rname = 'a_very_complicated_fieldname'
|
|
db.define_table('tt', Field('aa', rname=rname))
|
|
self.assertEqual(db.tt.insert(aa='1'), 1)
|
|
self.assertEqual(db.tt.insert(aa='1'), 2)
|
|
self.assertEqual(db.tt.insert(aa='1'), 3)
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
|
|
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
|
|
def testJoin(self):
|
|
db = self.connect()
|
|
rname = 'this_is_field_aa'
|
|
rname2 = 'this_is_field_b'
|
|
db.define_table('t1', Field('aa', rname=rname))
|
|
db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2))
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
db.t2.drop()
|
|
db.t1.drop()
|
|
|
|
db.define_table('person',Field('name', rname=rname))
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name', rname=rname),Field('ownerperson','reference person', rname=rname2))
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
db.dog.drop()
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
|
|
def testTFK(self):
|
|
db = self.connect()
|
|
if 'reference TFK' not in db._adapter.types:
|
|
self.skipTest('Adapter does not support TFK references')
|
|
db.define_table('t1',
|
|
Field('id1', type='string', length=1, rname='foo1'),
|
|
Field('id2', type='integer', rname='foo2'),
|
|
Field('val', type='integer'),
|
|
primarykey=['id1', 'id2'])
|
|
db.define_table('t2',
|
|
Field('ref1', type=db.t1.id1, rname='bar1'),
|
|
Field('ref2', type=db.t1.id2, rname='bar2'))
|
|
db.t1.insert(id1='a', id2=1, val=10)
|
|
db.t1.insert(id1='a', id2=2, val=30)
|
|
db.t2.insert(ref1='a', ref2=1)
|
|
query = (db.t1.id1 == db.t2.ref1) & (db.t1.id2 == db.t2.ref2)
|
|
result = db(query).select(db.t1.ALL)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]['id1'], 'a')
|
|
self.assertEqual(result[0]['id2'], 1)
|
|
self.assertEqual(result[0]['val'], 10)
|
|
|
|
|
|
class TestQuoting(DALtest):
|
|
|
|
# tests for case sensitivity
|
|
def testCase(self):
|
|
db = self.connect(ignore_field_case=False, entity_quoting=True)
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#multiple cascade gotcha
|
|
for key in ['reference','reference FK']:
|
|
db._adapter.types[key]=db._adapter.types[key].replace(
|
|
'%(on_delete_action)s','NO ACTION')
|
|
|
|
t0 = db.define_table('t0',
|
|
Field('f', 'string'))
|
|
t1 = db.define_table('b',
|
|
Field('B', t0),
|
|
Field('words', 'text'))
|
|
|
|
blather = 'blah blah and so'
|
|
t0[0] = {'f': 'content'}
|
|
t1[0] = {'B': int(t0[1]['id']),
|
|
'words': blather}
|
|
|
|
r = db(db.t0.id==db.b.B).select()
|
|
|
|
self.assertEqual(r[0].b.words, blather)
|
|
|
|
t1.drop()
|
|
t0.drop()
|
|
|
|
# test field case
|
|
try:
|
|
t0 = db.define_table('table_is_a_test',
|
|
Field('a_a'),
|
|
Field('a_A'))
|
|
except Exception as e:
|
|
# some db does not support case sensitive field names mysql is one of them.
|
|
if DEFAULT_URI.startswith('mysql:') or DEFAULT_URI.startswith('sqlite:'):
|
|
db.rollback()
|
|
return
|
|
if 'Column names in each table must be unique' in e.args[1]:
|
|
db.rollback()
|
|
return
|
|
raise e
|
|
|
|
t0[0] = dict(a_a = 'a_a', a_A='a_A')
|
|
|
|
self.assertEqual(t0[1].a_a, 'a_a')
|
|
self.assertEqual(t0[1].a_A, 'a_A')
|
|
|
|
def testPKFK(self):
|
|
|
|
# test primary keys
|
|
|
|
db = self.connect(ignore_field_case=False)
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#multiple cascade gotcha
|
|
for key in ['reference','reference FK']:
|
|
db._adapter.types[key]=db._adapter.types[key].replace(
|
|
'%(on_delete_action)s','NO ACTION')
|
|
# test table without surrogate key. Length must is limited to
|
|
# 100 because of MySQL limitations: it cannot handle more than
|
|
# 767 bytes in unique keys.
|
|
|
|
t0 = db.define_table('t0', Field('Code', length=100), primarykey=['Code'])
|
|
t2 = db.define_table('t2', Field('f'), Field('t0_Code', 'reference t0'))
|
|
t3 = db.define_table('t3', Field('f', length=100), Field('t0_Code', t0.Code), primarykey=['f'])
|
|
t4 = db.define_table('t4', Field('f', length=100), Field('t0', t0), primarykey=['f'])
|
|
|
|
try:
|
|
t5 = db.define_table('t5', Field('f', length=100), Field('t0', 'reference no_table_wrong_reference'), primarykey=['f'])
|
|
except Exception as e:
|
|
self.assertTrue(isinstance(e, KeyError))
|
|
|
|
if DEFAULT_URI.startswith('mssql'):
|
|
#there's no drop cascade in mssql
|
|
t3.drop()
|
|
t4.drop()
|
|
t2.drop()
|
|
t0.drop()
|
|
else:
|
|
t0.drop('cascade')
|
|
t2.drop()
|
|
t3.drop()
|
|
t4.drop()
|
|
|
|
|
|
class TestTableAndFieldCase(unittest.TestCase):
|
|
"""
|
|
at the Python level we should not allow db.C and db.c because of .table conflicts on windows
|
|
but it should be possible to map two different names into distinct tables "c" and "C" at the Python level
|
|
By default Python models names should be mapped into lower case table names and assume case insensitivity.
|
|
"""
|
|
def testme(self):
|
|
return
|
|
|
|
|
|
class TestQuotesByDefault(unittest.TestCase):
|
|
"""
|
|
all default tables names should be quoted unless an explicit mapping has been given for a table.
|
|
"""
|
|
def testme(self):
|
|
return
|
|
|
|
class TestGis(DALtest):
|
|
|
|
def testGeometry(self):
|
|
from pydal import geoPoint, geoLine, geoPolygon
|
|
if not IS_POSTGRESQL: return
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('point', 'geometry()'))
|
|
t1 = db.define_table('t1', Field('line', 'geometry(public, 4326, 2)'))
|
|
t2 = db.define_table('t2', Field('polygon', 'geometry(public, 4326, 2)'))
|
|
t0.insert(point=geoPoint(1,1))
|
|
text = db(db.t0.id).select(db.t0.point.st_astext()).first()[db.t0.point.st_astext()]
|
|
self.assertEqual(text, "POINT(1 1)")
|
|
t1.insert(line=geoLine((1,1),(2,2)))
|
|
text = db(db.t1.id).select(db.t1.line.st_astext()).first()[db.t1.line.st_astext()]
|
|
self.assertEqual(text, "LINESTRING(1 1,2 2)")
|
|
t2.insert(polygon=geoPolygon((0,0),(2,0),(2,2),(0,2),(0,0)))
|
|
text = db(db.t2.id).select(db.t2.polygon.st_astext()).first()[db.t2.polygon.st_astext()]
|
|
self.assertEqual(text, "POLYGON((0 0,2 0,2 2,0 2,0 0))")
|
|
query = t0.point.st_intersects(geoLine((0,0),(2,2)))
|
|
output = db(query).select(db.t0.point).first()[db.t0.point]
|
|
self.assertEqual(output, "POINT(1 1)")
|
|
query = t2.polygon.st_contains(geoPoint(1,1))
|
|
n = db(query).count()
|
|
self.assertEqual(n, 1)
|
|
x=t0.point.st_x()
|
|
y=t0.point.st_y()
|
|
point = db(t0.id).select(x, y).first()
|
|
self.assertEqual(point[x], 1)
|
|
self.assertEqual(point[y], 1)
|
|
|
|
def testGeometryCase(self):
|
|
from pydal import geoPoint, geoLine, geoPolygon
|
|
if not IS_POSTGRESQL: return
|
|
db = self.connect(ignore_field_case=False)
|
|
t0 = db.define_table('t0', Field('point', 'geometry()'), Field('Point', 'geometry()'))
|
|
t0.insert(point=geoPoint(1,1))
|
|
t0.insert(Point=geoPoint(2,2))
|
|
|
|
def testGisMigration(self):
|
|
if not IS_POSTGRESQL: return
|
|
for b in [True, False]:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=b)
|
|
t0 = db.define_table('t0', Field('Point', 'geometry()'),
|
|
Field('rname_point', 'geometry()', rname='foo'))
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=b)
|
|
t0 = db.define_table('t0', Field('New_point', 'geometry()'))
|
|
t0.drop()
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
class TestSQLCustomType(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
from pydal.helpers.classes import SQLCustomType
|
|
native_double = "double"
|
|
native_string = "string"
|
|
if hasattr(db._adapter, 'types'):
|
|
native_double = db._adapter.types['double']
|
|
native_string = db._adapter.types['string'] % {'length': 256}
|
|
basic_t = SQLCustomType(type = "double", native = native_double)
|
|
basic_t_str = SQLCustomType(type = "string", native = native_string)
|
|
t0=db.define_table('t0', Field("price", basic_t), Field("product", basic_t_str))
|
|
r_id = t0.insert(price=None, product=None)
|
|
row = db(t0.id == r_id).select(t0.ALL).first()
|
|
self.assertEqual(row['price'], None)
|
|
self.assertEqual(row['product'], None)
|
|
r_id = t0.insert(price=1.2, product="car")
|
|
row=db(t0.id == r_id).select(t0.ALL).first()
|
|
self.assertEqual(row['price'], 1.2)
|
|
self.assertEqual(row['product'], 'car')
|
|
t0.drop()
|
|
import zlib
|
|
compressed = SQLCustomType(
|
|
type ='text',
|
|
native='text',
|
|
encoder =(lambda x: zlib.compress(x or '', 1)),
|
|
decoder = (lambda x: zlib.decompress(x))
|
|
)
|
|
t1=db.define_table('t0',Field('cdata', compressed))
|
|
#r_id=t1.insert(cdata="car")
|
|
#row=db(t1.id == r_id).select(t1.ALL).first()
|
|
#self.assertEqual(row['cdata'], "'car'")
|
|
|
|
|
|
class TestLazy(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect(lazy_tables=True)
|
|
t0 = db.define_table('t0', Field('name'))
|
|
self.assertTrue(('t0' in db._LAZY_TABLES.keys()))
|
|
db.t0.insert(name='1')
|
|
self.assertFalse(('t0' in db._LAZY_TABLES.keys()))
|
|
|
|
def testLazyGetter(self):
|
|
db = self.connect(check_reserved=None, lazy_tables=True)
|
|
db.define_table('tt', Field('value', 'integer'))
|
|
db.define_table('ttt',
|
|
Field('value', 'integer'),
|
|
Field('tt_id', 'reference tt'),
|
|
)
|
|
# Force table definition
|
|
db.ttt.value.writable=False
|
|
idd=db.tt.insert(value=0)
|
|
db.ttt.insert(tt_id=idd)
|
|
|
|
def testRowNone(self):
|
|
db = self.connect(check_reserved=None, lazy_tables=True)
|
|
tt = db.define_table('tt', Field('value', 'integer'))
|
|
db.tt.insert(value=None)
|
|
row = db(db.tt).select(db.tt.ALL).first()
|
|
self.assertEqual(row.value, None)
|
|
self.assertEqual(row[db.tt.value], None)
|
|
self.assertEqual(row['tt.value'], None)
|
|
self.assertEqual(row.get('tt.value'), None)
|
|
self.assertEqual(row['value'], None)
|
|
self.assertEqual(row.get('value'), None)
|
|
|
|
def testRowExtra(self):
|
|
db = self.connect(check_reserved=None, lazy_tables=True)
|
|
tt = db.define_table('tt', Field('value', 'integer'))
|
|
db.tt.insert(value=1)
|
|
row = db(db.tt).select('value').first()
|
|
self.assertEqual(row.value, 1)
|
|
|
|
class TestRedefine(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], lazy_tables=True, migrate=False)
|
|
db.define_table('t_a', Field('code'))
|
|
self.assertTrue('code' in db.t_a)
|
|
self.assertTrue('code' in db['t_a'])
|
|
db.define_table('t_a', Field('code_a'), redefine=True)
|
|
self.assertFalse('code' in db.t_a)
|
|
self.assertFalse('code' in db['t_a'])
|
|
self.assertTrue('code_a' in db.t_a)
|
|
self.assertTrue('code_a' in db['t_a'])
|
|
db.close()
|
|
|
|
|
|
class TestUpdateInsert(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('name'))
|
|
i_id = t0.update_or_insert((t0.id == 1), name='web2py')
|
|
u_id = t0.update_or_insert((t0.id == i_id), name='web2py2')
|
|
self.assertTrue(i_id != None)
|
|
self.assertTrue(u_id == None)
|
|
self.assertTrue(db(t0).count() == 1)
|
|
self.assertTrue(db(t0.name == 'web2py').count() == 0)
|
|
self.assertTrue(db(t0.name == 'web2py2').count() == 1)
|
|
|
|
|
|
class TestBulkInsert(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('name'))
|
|
global ctr
|
|
ctr = 0
|
|
def test_after_insert(i, r):
|
|
self.assertIsInstance(i, OpRow)
|
|
global ctr
|
|
ctr += 1
|
|
return True
|
|
t0._after_insert.append(test_after_insert)
|
|
items = [{'name':'web2py_%s' % pos} for pos in range(0, 10, 1)]
|
|
t0.bulk_insert(items)
|
|
self.assertTrue(db(t0).count() == len(items))
|
|
for pos in range(0, 10, 1):
|
|
self.assertTrue(db(t0.name == 'web2py_%s' % pos).count() == 1)
|
|
self.assertTrue(ctr == len(items))
|
|
|
|
|
|
class TestRecordVersioning(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
db.define_table('t0', Field('name'), Field('is_active', writable=False,readable=False,default=True))
|
|
db.t0._enable_record_versioning(archive_name='t0_archive')
|
|
self.assertTrue('t0_archive' in db)
|
|
i_id = db.t0.insert(name='web2py1')
|
|
db.t0.insert(name='web2py2')
|
|
db(db.t0.name == 'web2py2').delete()
|
|
self.assertEqual(len(db(db.t0).select()), 1)
|
|
self.assertEqual(db(db.t0).count(), 1)
|
|
db(db.t0.id == i_id).update(name='web2py3')
|
|
self.assertEqual(len(db(db.t0).select()), 1)
|
|
self.assertEqual(db(db.t0).count(), 1)
|
|
self.assertEqual(len(db(db.t0_archive).select()), 2)
|
|
self.assertEqual(db(db.t0_archive).count(), 2)
|
|
|
|
|
|
@unittest.skipIf(IS_SQLITE, "Skip sqlite")
|
|
class TestConnection(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
# check connection is no longer active after close
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
connection = db._adapter.connection
|
|
db.close()
|
|
self.assertRaises(Exception, connection.commit)
|
|
|
|
# check connection are reused with pool_size
|
|
connections = set()
|
|
for a in range(10):
|
|
db2 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
|
|
c = db2._adapter.connection
|
|
connections.add(c)
|
|
db2.close()
|
|
self.assertEqual(len(connections), 1)
|
|
c = connections.pop()
|
|
c.commit()
|
|
c.close()
|
|
# check correct use of pool_size
|
|
dbs = []
|
|
for a in range(10):
|
|
db3 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
|
|
dbs.append(db3)
|
|
for db in dbs:
|
|
db.close()
|
|
self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5)
|
|
for c in db3._adapter.POOLS[DEFAULT_URI]:
|
|
c.close()
|
|
db3._adapter.POOLS[DEFAULT_URI] = []
|
|
# Clean close if a connection is broken (closed explicity)
|
|
for a in range(10):
|
|
db4 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
|
|
db4._adapter.connection.close()
|
|
db4.close()
|
|
self.assertEqual(len(db4._adapter.POOLS[DEFAULT_URI]), 0)
|
|
|
|
class TestSerializers(DALtest):
|
|
|
|
def testAsJson(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('date_field', 'datetime'))
|
|
db.tt.insert(date_field=datetime.datetime.now())
|
|
rows = db().select(db.tt.ALL)
|
|
j=rows.as_json()
|
|
import json #standard library
|
|
json.loads(j)
|
|
|
|
def testSelectIterselect(self):
|
|
db = self.connect()
|
|
db.define_table('tt', Field('tt'))
|
|
db.tt.insert(tt='pydal')
|
|
methods = ['as_dict', 'as_csv', 'as_json', 'as_xml', 'as_list']
|
|
for method in methods:
|
|
rows = db(db.tt).select()
|
|
rowsI = db(db.tt).iterselect()
|
|
self.assertEqual(getattr(rows, method)(),
|
|
getattr(rowsI, method)(),
|
|
'failed %s' % method)
|
|
|
|
|
|
class TestIterselect(DALtest):
|
|
|
|
def testRun(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('name'))
|
|
names = ['web2py', 'pydal', 'Massimo']
|
|
for n in names:
|
|
t0.insert(name=n)
|
|
|
|
rows = db(db.t0).select(orderby=db.t0.id)
|
|
for pos, r in enumerate(rows):
|
|
self.assertEqual(r.name, names[pos])
|
|
# Testing basic iteration
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
for pos, r in enumerate(rows):
|
|
self.assertEqual(r.name, names[pos])
|
|
# Testing IterRows.first before basic iteration
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
self.assertEqual(rows.first().name, names[0])
|
|
self.assertEqual(rows.first().name, names[0])
|
|
|
|
for pos, r in enumerate(rows):
|
|
self.assertEqual(r.name, names[pos])
|
|
# Testing IterRows.__nonzero__ before basic iteration
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
if rows:
|
|
for pos, r in enumerate(rows):
|
|
self.assertEqual(r.name, names[pos])
|
|
|
|
# Empty iterRows
|
|
rows = db(db.t0.name=="IterRows").iterselect(orderby=db.t0.id)
|
|
self.assertEqual(bool(rows), False)
|
|
for pos, r in enumerate(rows):
|
|
self.assertEqual(r.name, names[pos])
|
|
|
|
# Testing IterRows.__getitem__
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
self.assertEqual(rows[0].name, names[0])
|
|
self.assertEqual(rows[1].name, names[1])
|
|
# recall the same item
|
|
self.assertEqual(rows[1].name, names[1])
|
|
self.assertEqual(rows[2].name, names[2])
|
|
self.assertRaises(IndexError, rows.__getitem__, 1)
|
|
|
|
# Testing IterRows.next()
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
for n in names:
|
|
self.assertEqual(next(rows).name, n)
|
|
self.assertRaises(StopIteration, next, rows)
|
|
|
|
# Testing IterRows.compact
|
|
rows = db(db.t0).iterselect(orderby=db.t0.id)
|
|
rows.compact = False
|
|
for n in names:
|
|
self.assertEqual(next(rows).t0.name, n)
|
|
|
|
@unittest.skipIf(IS_MSSQL, "Skip mssql")
|
|
def testMultiSelect(self):
|
|
# Iterselect holds the cursors until all elemets have been evaluated
|
|
# inner queries use new cursors
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('name'), Field('name_copy'))
|
|
db(db.t0).delete()
|
|
db.commit()
|
|
names = ['web2py', 'pydal', 'Massimo']
|
|
for n in names:
|
|
t0.insert(name=n)
|
|
c = 0
|
|
for r in db(db.t0).iterselect():
|
|
db.t0.update_or_insert(db.t0.id == r.id, name_copy = r.name)
|
|
c += 1
|
|
|
|
self.assertEqual(c, len(names), "The iterator is not looping over all elements")
|
|
self.assertEqual(db(db.t0).count(), len(names))
|
|
c = 0
|
|
for x in db(db.t0).iterselect(orderby=db.t0.id):
|
|
for y in db(db.t0).iterselect(orderby=db.t0.id):
|
|
db.t0.update_or_insert(db.t0.id == x.id, name_copy = x.name)
|
|
c += 1
|
|
|
|
self.assertEqual(c, len(names)*len(names))
|
|
self.assertEqual(db(db.t0).count(), len(names))
|
|
db._adapter.test_connection()
|
|
|
|
@unittest.skipIf(IS_SQLITE | IS_MSSQL, "Skip sqlite & ms sql")
|
|
def testMultiSelectWithCommit(self):
|
|
db = self.connect()
|
|
t0 = db.define_table('t0', Field('nn', 'integer'))
|
|
for n in xrange(1, 100, 1):
|
|
t0.insert(nn=n)
|
|
db.commit()
|
|
s = db.t0.nn.sum()
|
|
tot = db(db.t0).select(s).first()[s]
|
|
c = 0
|
|
for r in db(db.t0).iterselect(db.t0.ALL):
|
|
db.t0.update_or_insert(db.t0.id == r.id, nn = r.nn * 2)
|
|
db.commit()
|
|
c += 1
|
|
|
|
self.assertEqual(c, db(db.t0).count())
|
|
self.assertEqual(tot * 2, db(db.t0).select(s).first()[s])
|
|
|
|
db._adapter.test_connection()
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
tearDownModule()
|