2318 lines
93 KiB
Python
2318 lines
93 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Unit tests for NoSQL adapters
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
import sys
|
|
import os
|
|
import glob
|
|
import datetime
|
|
from ._compat import unittest
|
|
|
|
from pydal._compat import PY2, basestring, StringIO, to_bytes, long
|
|
from pydal import DAL, Field
|
|
from pydal.objects import Table, Query, Expression
|
|
from pydal.helpers.classes import SQLALL, OpRow
|
|
from pydal.exceptions import NotOnNOSQLError
|
|
from ._adapt import DEFAULT_URI, IS_IMAP, drop, IS_GAE, IS_MONGODB, _quote
|
|
|
|
if IS_IMAP:
|
|
from pydal.adapters import IMAPAdapter
|
|
from pydal.contrib import mockimaplib
|
|
IMAPAdapter.driver = mockimaplib
|
|
elif IS_MONGODB:
|
|
from pydal.adapters.mongo import Expansion
|
|
elif IS_GAE:
|
|
# setup GAE dummy database
|
|
from google.appengine.ext import testbed
|
|
gaetestbed = testbed.Testbed()
|
|
gaetestbed.activate()
|
|
gaetestbed.init_datastore_v3_stub()
|
|
gaetestbed.init_memcache_stub()
|
|
|
|
print('Testing against %s engine (%s)' % (DEFAULT_URI.partition(':')[0],
|
|
DEFAULT_URI))
|
|
|
|
ALLOWED_DATATYPES = [
|
|
'string',
|
|
'text',
|
|
'integer',
|
|
'boolean',
|
|
'double',
|
|
'blob',
|
|
'date',
|
|
'time',
|
|
'datetime',
|
|
'upload',
|
|
'password',
|
|
'json',
|
|
]
|
|
|
|
def setUpModule():
|
|
if not IS_IMAP:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
|
|
def clean_table(db, tablename):
|
|
try:
|
|
db.define_table(tablename)
|
|
except Exception as e:
|
|
pass
|
|
try:
|
|
drop(db[tablename])
|
|
except Exception as e:
|
|
pass
|
|
|
|
for tablename in ['tt', 't0', 't1', 't2', 't3', 't4',
|
|
'easy_name', 'tt_archive', 'pet_farm', 'person']:
|
|
clean_table(db, tablename)
|
|
db.close()
|
|
|
|
|
|
def tearDownModule():
|
|
if os.path.isfile('sql.log'):
|
|
os.unlink('sql.log')
|
|
for a in glob.glob('*.table'):
|
|
os.unlink(a)
|
|
|
|
|
|
@unittest.skipIf(not IS_MONGODB, "Skipping MongoDB Tests")
|
|
class TestMongo(unittest.TestCase):
|
|
""" Tests specific to MongoDB, error and side path exercisers, etc
|
|
"""
|
|
|
|
def testVersionCheck(self):
|
|
driver_args={'fake_version': '2.9 Phony'}
|
|
with self.assertRaises(Exception):
|
|
db = DAL(DEFAULT_URI, attempts=1, check_reserved=['all'],
|
|
driver_args=driver_args)
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', 'reference'))
|
|
with self.assertRaises(ValueError):
|
|
db.tt.insert(aa='x')
|
|
with self.assertRaises(ValueError):
|
|
db.tt.insert(aa='_')
|
|
with self.assertRaises(TypeError):
|
|
db.tt.insert(aa=3.1)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='<random>'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='0x1'), long), True)
|
|
with self.assertRaises(RuntimeError):
|
|
db(db.tt.aa+1==1).update(aa=0)
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa', 'date'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=None), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, None)
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa', 'time'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=None), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, None)
|
|
with self.assertRaises(RuntimeError):
|
|
db(db.tt.aa <= None).count()
|
|
with self.assertRaises(NotImplementedError):
|
|
db._adapter.select(
|
|
Query(db, db._adapter.dialect.aggregate, db.tt.aa, 'UNKNOWN'),
|
|
[db.tt.aa], {})
|
|
with self.assertRaises(NotImplementedError):
|
|
db._adapter.select(
|
|
Expression(
|
|
db, db._adapter.dialect.extract, db.tt.aa, 'UNKNOWN',
|
|
'integer'),
|
|
[db.tt.aa], {})
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa', 'integer'))
|
|
case=(db.tt.aa == 0).case(db.tt.aa + 2)
|
|
with self.assertRaises(SyntaxError):
|
|
db(case).count()
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa'), Field('bb', 'integer'),
|
|
Field('cc', 'list:integer'))
|
|
db.tt.insert(aa="aa")
|
|
|
|
with self.assertRaises(NotImplementedError):
|
|
db((db.tt.aa+1).contains(db.tt.aa)).count()
|
|
with self.assertRaises(NotImplementedError):
|
|
db(db.tt.cc.contains(db.tt.aa)).count()
|
|
with self.assertRaises(NotImplementedError):
|
|
db(db.tt.aa.contains(db.tt.cc)).count()
|
|
with self.assertRaises(NotImplementedError):
|
|
db(db.tt.aa.contains(1.0)).count()
|
|
with self.assertRaises(NotImplementedError):
|
|
db().select(db.tt.aa.lower()[4:-1]).first()
|
|
with self.assertRaises(NotOnNOSQLError):
|
|
db(db.tt.aa.belongs(db()._select(db.tt.aa))).count()
|
|
with self.assertRaises(RuntimeError):
|
|
db(db.tt.aa.lower()).update(aa='bb')
|
|
with self.assertRaises(NotImplementedError):
|
|
db(db.tt).select(orderby='<random>')
|
|
with self.assertRaises(RuntimeError):
|
|
db().select()
|
|
with self.assertRaises(RuntimeError):
|
|
Expansion(db._adapter, 'delete',
|
|
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), [True])
|
|
with self.assertRaises(RuntimeError):
|
|
Expansion(db._adapter, 'delete',
|
|
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), [True])
|
|
with self.assertRaises(RuntimeError):
|
|
expanded = Expansion(db._adapter, 'count',
|
|
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), [True])
|
|
expanded = Expansion(db._adapter, 'count',
|
|
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), [])
|
|
self.assertEqual(db._adapter.expand(expanded).query_dict, {'aa': 'x'})
|
|
|
|
if db._adapter.server_version_major >= 2.6:
|
|
with self.assertRaises(RuntimeError):
|
|
db(db.tt).update(id=1)
|
|
else:
|
|
db(db.tt).update(id=1)
|
|
self.assertNotEqual(db(db.tt.aa=='aa').select(db.tt.id).response[0][0], 1)
|
|
drop(db.tt)
|
|
|
|
db.close()
|
|
|
|
for safe in [False, True, False]:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='x'), long), True)
|
|
with self.assertRaises(RuntimeError):
|
|
db._adapter.delete(db['tt'], 'x', safe=safe)
|
|
self.assertEqual(db._adapter.delete(
|
|
db['tt'], Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), safe=safe), 1)
|
|
self.assertEqual(db(db.tt.aa=='x').count(), 0)
|
|
self.assertEqual(
|
|
db._adapter.update(
|
|
db['tt'],
|
|
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'),
|
|
db['tt']._fields_and_values_for_update(
|
|
{'aa':'x'}).op_values(),
|
|
safe=safe
|
|
), 0)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testJoin(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', 'integer'), Field('b', 'reference tt'))
|
|
i1 = db.tt.insert(aa=1)
|
|
db.tt.insert(aa=4, b=i1)
|
|
q = db.tt.b==db.tt.id
|
|
with self.assertRaises(NotOnNOSQLError):
|
|
db(db.tt).select(left=db.tt.on(q))
|
|
with self.assertRaises(NotOnNOSQLError):
|
|
db(db.tt).select(join=db.tt.on(q))
|
|
with self.assertRaises(NotOnNOSQLError):
|
|
db(db.tt).select(db.tt.on(q))
|
|
with self.assertRaises(TypeError):
|
|
db(db.tt).select(UNKNOWN=True)
|
|
db(db.tt).select(for_update=True)
|
|
self.assertEqual(db(db.tt).count(), 2)
|
|
db.tt.truncate()
|
|
self.assertEqual(db(db.tt).count(), 0)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestFields(unittest.TestCase):
|
|
|
|
def testFieldName(self):
|
|
"""
|
|
- a "str" something
|
|
- not a method or property of Table
|
|
- "dotted-notation" friendly:
|
|
- a valid python identifier
|
|
- not a python keyword
|
|
- not starting with underscore or an integer
|
|
- not containing dots
|
|
|
|
Basically, anything alphanumeric, no symbols, only underscore as
|
|
punctuation
|
|
"""
|
|
|
|
# Check that Fields cannot start with underscores
|
|
self.assertRaises(SyntaxError, Field, '_abc', 'string')
|
|
|
|
# Check that Fields cannot contain punctuation other than underscores
|
|
self.assertRaises(SyntaxError, Field, 'a.bc', 'string')
|
|
|
|
# Check that Fields cannot be a name of a method or property of Table
|
|
for x in ['drop', 'on', 'truncate']:
|
|
self.assertRaises(SyntaxError, Field, x, 'string')
|
|
|
|
# Check that Fields allows underscores in the body of a field name.
|
|
self.assertTrue(Field('a_bc', 'string'),
|
|
"Field isn't allowing underscores in fieldnames. It should.")
|
|
|
|
# Check that Field names don't allow a python keyword
|
|
self.assertRaises(SyntaxError, Field, 'True', 'string')
|
|
self.assertRaises(SyntaxError, Field, 'elif', 'string')
|
|
self.assertRaises(SyntaxError, Field, 'while', 'string')
|
|
|
|
# Check that Field names don't allow a non-valid python identifier
|
|
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Field, a, 'string')
|
|
|
|
# Check that Field names don't allow a unicode string
|
|
non_valid_examples = non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
|
|
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
|
|
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Field, a, 'string')
|
|
|
|
def testFieldTypes(self):
|
|
|
|
# Check that string, and password default length is 512
|
|
for typ in ['string', 'password']:
|
|
self.assertTrue(Field('abc', typ).length == 512,
|
|
"Default length for type '%s' is not 512 or 255" % typ)
|
|
|
|
# Check that upload default length is 512
|
|
self.assertTrue(Field('abc', 'upload').length == 512,
|
|
"Default length for type 'upload' is not 512")
|
|
|
|
# Check that Tables passed in the type creates a reference
|
|
self.assertTrue(Field('abc', Table(None, 'temp')).type
|
|
== 'reference temp',
|
|
'Passing a Table does not result in a reference type.')
|
|
|
|
def testFieldLabels(self):
|
|
|
|
# Check that a label is successfully built from the supplied fieldname
|
|
self.assertTrue(Field('abc', 'string').label == 'Abc',
|
|
'Label built is incorrect')
|
|
self.assertTrue(Field('abc_def', 'string').label == 'Abc Def',
|
|
'Label built is incorrect')
|
|
|
|
def testFieldFormatters(self): # Formatter should be called Validator
|
|
|
|
# Test the default formatters
|
|
for typ in ALLOWED_DATATYPES:
|
|
f = Field('abc', typ)
|
|
if typ not in ['date', 'time', 'datetime']:
|
|
isinstance(f.formatter('test'), str)
|
|
else:
|
|
isinstance(f.formatter(datetime.datetime.now()), str)
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
import pickle
|
|
|
|
# some db's only support milliseconds
|
|
datetime_datetime_today = datetime.datetime.today()
|
|
datetime_datetime_today = datetime_datetime_today.replace(
|
|
microsecond = datetime_datetime_today.microsecond -
|
|
datetime_datetime_today.microsecond % 1000)
|
|
|
|
insert_vals = [
|
|
('string', 'x', ''),
|
|
('string', 'A\xc3\xa9 A', ''),
|
|
('text', 'x', ''),
|
|
('password', 'x', ''),
|
|
('upload', 'x', ''),
|
|
('double', 3.1, 1),
|
|
('integer', 3, 1),
|
|
('boolean', True, True),
|
|
('date', datetime.date.today(), datetime.date.today()),
|
|
('datetime', datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
|
|
datetime_datetime_today),
|
|
('time', datetime_datetime_today.time(),
|
|
datetime_datetime_today.time()),
|
|
('blob', 'x', ''),
|
|
('blob', b'xyzzy', ''),
|
|
# pickling a tuple will create a string which is not UTF-8 able.
|
|
('blob', pickle.dumps((0,), pickle.HIGHEST_PROTOCOL), ''),
|
|
]
|
|
|
|
if not IS_GAE:
|
|
# these are unsupported by GAE
|
|
insert_vals.append(('blob', bytearray('a','utf-8'), ''))
|
|
insert_vals.append(('json', {'a': 'b', 'c': [1, 2]}, {}))
|
|
|
|
for iv in insert_vals:
|
|
db.define_table('tt', Field('aa', iv[0], default=iv[2]))
|
|
# empty string stored to blob returns None
|
|
default_return = None if iv[0] == 'blob' and iv[2] == '' else iv[2]
|
|
self.assertTrue(isinstance(db.tt.insert(), long))
|
|
self.assertTrue(isinstance(db.tt.insert(aa=iv[1]), long))
|
|
self.assertTrue(isinstance(db.tt.insert(aa=None), long))
|
|
cv = iv[1]
|
|
if IS_MONGODB and not PY2 and iv[0] == 'blob':
|
|
cv = to_bytes(iv[1])
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, default_return)
|
|
self.assertEqual(db().select(db.tt.aa)[1].aa, cv)
|
|
self.assertEqual(db().select(db.tt.aa)[2].aa, None)
|
|
|
|
if not IS_GAE:
|
|
## field aliases
|
|
row = db().select(db.tt.aa.with_alias('zz'))[1]
|
|
self.assertEqual(row['zz'], cv)
|
|
|
|
drop(db.tt)
|
|
|
|
## Row APIs
|
|
db.define_table('tt', Field('aa', 'datetime',
|
|
default=datetime.datetime.today()))
|
|
t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0)
|
|
id = db.tt.insert(aa=t0)
|
|
self.assertEqual(isinstance(id, long), True)
|
|
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(db.tt[id].aa,t0)
|
|
self.assertEqual(db.tt['aa'],db.tt.aa)
|
|
self.assertEqual(db.tt(id).aa,t0)
|
|
self.assertTrue(db.tt(id,aa=None)==None)
|
|
self.assertFalse(db.tt(id,aa=t0)==None)
|
|
self.assertEqual(row.aa,t0)
|
|
self.assertEqual(row['aa'],t0)
|
|
self.assertEqual(row['tt.aa'],t0)
|
|
self.assertEqual(row('tt.aa'),t0)
|
|
|
|
## Lazy and Virtual fields
|
|
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
|
|
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(row.b,t0)
|
|
self.assertEqual(row.c(),t0)
|
|
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'time', default='11:30'))
|
|
t0 = datetime.time(10, 30, 55)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestTables(unittest.TestCase):
|
|
|
|
def testTableNames(self):
|
|
"""
|
|
- a "str" something
|
|
- not a method or property of DAL
|
|
- "dotted-notation" friendly:
|
|
- a valid python identifier
|
|
- not a python keyword
|
|
- not starting with underscore or an integer
|
|
- not containing dots
|
|
|
|
Basically, anything alphanumeric, no symbols, only underscore as
|
|
punctuation
|
|
"""
|
|
|
|
# Check that Tables cannot start with underscores
|
|
self.assertRaises(SyntaxError, Table, None, '_abc')
|
|
|
|
# Check that Tables cannot contain punctuation other than underscores
|
|
self.assertRaises(SyntaxError, Table, None, 'a.bc')
|
|
|
|
# Check that Tables cannot be a name of a method or property of DAL
|
|
for x in ['define_table', 'tables', 'as_dict']:
|
|
self.assertRaises(SyntaxError, Table, None, x)
|
|
|
|
# Check that Table allows underscores in the body of a field name.
|
|
self.assertTrue(Table(None, 'a_bc'),
|
|
"Table isn't allowing underscores in tablename. It should.")
|
|
|
|
# Check that Table names don't allow a python keyword
|
|
self.assertRaises(SyntaxError, Table, None, 'True')
|
|
self.assertRaises(SyntaxError, Table, None, 'elif')
|
|
self.assertRaises(SyntaxError, Table, None, 'while')
|
|
|
|
# Check that Table names don't allow a non-valid python identifier
|
|
non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Table, None, a)
|
|
|
|
# Check that Table names don't allow a unicode string
|
|
non_valid_examples = ["ℙƴ☂ℌøἤ", u"ℙƴ☂ℌøἤ",
|
|
u'àè', u'ṧøмℯ', u'тεṧт', u'♥αłüℯṧ',
|
|
u'ℊεᾔ℮яαт℮∂', u'♭ƴ', u'ᾔ☤ρℌℓ☺ḓ']
|
|
for a in non_valid_examples:
|
|
self.assertRaises(SyntaxError, Table, None, a)
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestAll(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.pt = Table(None,'PseudoTable',Field('name'),Field('birthdate'))
|
|
|
|
def testSQLALL(self):
|
|
ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate'
|
|
self.assertEqual(str(SQLALL(self.pt)), ans)
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestTable(unittest.TestCase):
|
|
|
|
def testTableCreation(self):
|
|
|
|
# Check for error when not passing type other than Field or Table
|
|
|
|
self.assertRaises(SyntaxError, Table, None, 'test', None)
|
|
|
|
persons = Table(None, 'persons',
|
|
Field('firstname','string'),
|
|
Field('lastname', 'string'))
|
|
|
|
# Does it have the correct fields?
|
|
|
|
self.assertTrue(set(persons.fields).issuperset(set(['firstname',
|
|
'lastname'])))
|
|
|
|
# ALL is set correctly
|
|
|
|
self.assertTrue('persons.firstname, persons.lastname'
|
|
in str(persons.ALL))
|
|
|
|
def testTableAlias(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
persons = Table(db, 'persons', Field('firstname',
|
|
'string'), Field('lastname', 'string'))
|
|
aliens = persons.with_alias('aliens')
|
|
|
|
# Are the different table instances with the same fields
|
|
|
|
self.assertTrue(persons is not aliens)
|
|
self.assertTrue(set(persons.fields) == set(aliens.fields))
|
|
db.close()
|
|
|
|
def testTableInheritance(self):
|
|
persons = Table(None, 'persons', Field('firstname',
|
|
'string'), Field('lastname', 'string'))
|
|
customers = Table(None, 'customers',
|
|
Field('items_purchased', 'integer'),
|
|
persons)
|
|
self.assertTrue(set(customers.fields).issuperset(set(
|
|
['items_purchased', 'firstname', 'lastname'])))
|
|
|
|
|
|
class TestInsert(unittest.TestCase):
|
|
def testRun(self):
|
|
if IS_IMAP:
|
|
imap = DAL(DEFAULT_URI)
|
|
imap.define_tables()
|
|
self.assertEqual(imap.Draft.insert(to="nurse@example.com",
|
|
subject="Nurse!",
|
|
sender="gumby@example.com",
|
|
content="Nurse!\r\nNurse!"), 2)
|
|
self.assertEqual(imap.Draft[2].subject, "Nurse!")
|
|
self.assertEqual(imap.Draft[2].sender, "gumby@example.com")
|
|
self.assertEqual(isinstance(imap.Draft[2].uid, long), True)
|
|
self.assertEqual(imap.Draft[2].content[0]["text"], "Nurse!\r\nNurse!")
|
|
imap.close()
|
|
else:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
|
|
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
|
|
def callable():
|
|
return 'aa'
|
|
self.assertTrue(isinstance(db.tt.insert(aa=callable), long))
|
|
self.assertEqual(db(db.tt.aa == 'aa').count(), 1)
|
|
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestSelect(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='2'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='3'), long), True)
|
|
self.assertEqual(db(db.tt.id > 0).count(), 3)
|
|
self.assertEqual(db(db.tt.aa).count(), 3)
|
|
self.assertEqual(db(db.tt.id).count(), 3)
|
|
self.assertEqual(db(db.tt.id!=None).count(), 3)
|
|
|
|
self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa | db.tt.id)[0].aa, '3')
|
|
self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa)[0].aa, '3')
|
|
self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1)
|
|
self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, '2')
|
|
self.assertEqual(len(db().select(db.tt.ALL)), 3)
|
|
self.assertEqual(db(db.tt.aa == None).count(), 0)
|
|
self.assertEqual(db(db.tt.aa != None).count(), 3)
|
|
self.assertEqual(db(db.tt.aa > '1').count(), 2)
|
|
self.assertEqual(db(db.tt.aa >= '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 1)
|
|
self.assertEqual(db(db.tt.aa != '1').count(), 2)
|
|
self.assertEqual(db(db.tt.aa < '3').count(), 2)
|
|
self.assertEqual(db(db.tt.aa <= '3').count(), 3)
|
|
self.assertEqual(db(db.tt.aa > '1')(db.tt.aa < '3').count(), 1)
|
|
self.assertEqual(db((db.tt.aa > '1') & (db.tt.aa < '3')).count(), 1)
|
|
self.assertEqual(db((db.tt.aa > '1') | (db.tt.aa < '3')).count(), 3)
|
|
# Test not operator
|
|
self.assertEqual(db(~(db.tt.aa != '1')).count(), 1)
|
|
self.assertEqual(db(~(db.tt.aa == '1')).count(), 2)
|
|
self.assertEqual(db((db.tt.aa > '1') & ~(db.tt.aa > '2')).count(), 1)
|
|
self.assertEqual(db(~(db.tt.aa > '1') & (db.tt.aa > '2')).count(), 0)
|
|
self.assertEqual(db(~((db.tt.aa < '1') | (db.tt.aa > '2'))).count(), 2)
|
|
self.assertEqual(db(~((db.tt.aa >= '1') & (db.tt.aa <= '2'))).count(), 1)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testListInteger(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt',
|
|
Field('aa', 'list:integer'))
|
|
l=[0,1,2,3,4,5]
|
|
db.tt.insert(aa=l)
|
|
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testListString(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt',
|
|
Field('aa', 'list:string'))
|
|
l=['a', 'b', 'c']
|
|
db.tt.insert(aa=l)
|
|
self.assertEqual(db(db.tt).select('tt.aa').first()[db.tt.aa],l)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testListReference(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
on_deletes = (
|
|
'CASCADE',
|
|
'SET NULL',
|
|
)
|
|
for ondelete in on_deletes:
|
|
db.define_table('t0', Field('aa', 'string'))
|
|
db.define_table('tt', Field('t0_id', 'list:reference t0',
|
|
ondelete=ondelete))
|
|
id_a1=db.t0.insert(aa='test1')
|
|
id_a2=db.t0.insert(aa='test2')
|
|
ref1=[id_a1]
|
|
ref2=[id_a2]
|
|
ref3=[id_a1, id_a2]
|
|
db.tt.insert(t0_id=ref1)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
|
|
db.tt.insert(t0_id=ref2)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
|
|
db.tt.insert(t0_id=ref3)
|
|
self.assertEqual(
|
|
db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
|
|
|
|
if IS_MONGODB:
|
|
self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
|
|
|
|
self.assertEqual(db(db.tt.t0_id.contains(id_a1)).count(), 2)
|
|
self.assertEqual(db(db.tt.t0_id.contains(id_a2)).count(), 2)
|
|
db(db.t0.aa == 'test1').delete()
|
|
if ondelete == 'SET NULL':
|
|
self.assertEqual(db(db.tt).count(), 3)
|
|
self.assertEqual(db(db.tt).select()[0].t0_id, [])
|
|
if ondelete == 'CASCADE':
|
|
self.assertEqual(db(db.tt).count(), 2)
|
|
self.assertEqual(db(db.tt).select()[0].t0_id, ref2)
|
|
|
|
drop(db.tt)
|
|
drop(db.t0)
|
|
db.close()
|
|
|
|
@unittest.skipIf(IS_GAE, "no groupby in appengine")
|
|
def testGroupByAndDistinct(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb', 'integer'),
|
|
Field('cc', 'integer'))
|
|
db.tt.insert(aa='4', bb=1, cc=1)
|
|
db.tt.insert(aa='3', bb=2, cc=1)
|
|
db.tt.insert(aa='3', bb=1, cc=1)
|
|
db.tt.insert(aa='1', bb=1, cc=1)
|
|
db.tt.insert(aa='1', bb=2, cc=1)
|
|
db.tt.insert(aa='1', bb=3, cc=1)
|
|
db.tt.insert(aa='1', bb=4, cc=1)
|
|
db.tt.insert(aa='2', bb=1, cc=1)
|
|
db.tt.insert(aa='2', bb=2, cc=1)
|
|
db.tt.insert(aa='2', bb=3, cc=1)
|
|
self.assertEqual(db(db.tt).count(), 10)
|
|
|
|
# test groupby
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa)
|
|
self.assertEqual(len(result), 4)
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=db.tt.aa)
|
|
self.assertEqual(tuple(result.response[2]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=~db.tt.aa)
|
|
self.assertEqual(tuple(result.response[1]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb, db.tt.cc.sum(),
|
|
groupby=db.tt.aa|db.tt.bb,
|
|
orderby=(db.tt.aa|~db.tt.bb))
|
|
self.assertEqual(tuple(result.response[4]), ('2', 3, 1))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, orderby=~db.tt.aa, limitby=(1,2))
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(tuple(result.response[0]), ('3', 3))
|
|
result = db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, limitby=(0,3))
|
|
self.assertEqual(len(result), 3)
|
|
self.assertEqual(tuple(result.response[2]), ('3', 3))
|
|
|
|
# test having
|
|
self.assertEqual(len(db().select(db.tt.aa, db.tt.bb.sum(),
|
|
groupby=db.tt.aa, having=db.tt.bb.sum() > 2)), 3)
|
|
|
|
# test distinct
|
|
result = db().select(db.tt.aa, db.tt.cc, distinct=True)
|
|
self.assertEqual(len(result), 4)
|
|
result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].cc, 1)
|
|
result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa)
|
|
self.assertEqual(result[2].aa, '2')
|
|
self.assertEqual(result[1].aa, '3')
|
|
result = db().select(db.tt.aa, db.tt.bb,
|
|
distinct=True, orderby=(db.tt.aa|~db.tt.bb))
|
|
self.assertEqual(tuple(result.response[4]), ('2', 3))
|
|
result = db().select(db.tt.aa,
|
|
distinct=db.tt.aa, orderby=~db.tt.aa, limitby=(1,2))
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].aa, '3')
|
|
|
|
# test count distinct
|
|
db.tt.insert(aa='2', bb=3, cc=1)
|
|
self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4)
|
|
self.assertEqual(db(db.tt).count(distinct=db.tt.aa|db.tt.bb), 10)
|
|
self.assertEqual(db(db.tt).count(distinct=db.tt.aa|db.tt.bb|db.tt.cc), 10)
|
|
self.assertEqual(db(db.tt).count(distinct=True), 10)
|
|
self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4)
|
|
self.assertEqual(db(db.tt.aa).count(), 11)
|
|
count=db.tt.aa.count()
|
|
self.assertEqual(db(db.tt).select(count).first()[count], 11)
|
|
|
|
count=db.tt.aa.count(distinct=True)
|
|
sum=db.tt.bb.sum()
|
|
result = db(db.tt).select(count, sum)
|
|
self.assertEqual(tuple(result.response[0]), (4, 23))
|
|
self.assertEqual(result.first()[count], 4)
|
|
self.assertEqual(result.first()[sum], 23)
|
|
|
|
if not IS_MONGODB or db._adapter.server_version_major >= 2.6:
|
|
# mongo < 2.6 does not support $size
|
|
count=db.tt.aa.count(distinct=True)+db.tt.bb.count(distinct=True)
|
|
self.assertEqual(db(db.tt).select(count).first()[count], 8)
|
|
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
@unittest.skipIf(IS_GAE, "no coalesce in appengine")
|
|
def testCoalesce(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('bb'), Field('cc'), Field('dd'))
|
|
db.tt.insert(aa='xx')
|
|
db.tt.insert(aa='xx', bb='yy')
|
|
db.tt.insert(aa='xx', bb='yy', cc='zz')
|
|
db.tt.insert(aa='xx', bb='yy', cc='zz', dd='')
|
|
result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa))
|
|
self.assertEqual(result.response[0][0], 'xx')
|
|
self.assertEqual(result.response[1][0], 'yy')
|
|
self.assertEqual(result.response[2][0], 'zz')
|
|
self.assertEqual(result.response[3][0], '')
|
|
db.tt.drop()
|
|
|
|
db.define_table('tt', Field('aa', 'integer'), Field('bb'))
|
|
db.tt.insert(bb='')
|
|
db.tt.insert(aa=1)
|
|
result = db(db.tt).select(db.tt.aa.coalesce_zero())
|
|
self.assertEqual(result.response[0][0], 0)
|
|
self.assertEqual(result.response[1][0], 1)
|
|
|
|
db.tt.drop()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestAddMethod(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
|
|
@db.tt.add_method.all
|
|
def select_all(table,orderby=None):
|
|
return table._db(table).select(orderby=orderby)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='2'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='3'), long), True)
|
|
self.assertEqual(len(db.tt.all()), 3)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestBelongs(unittest.TestCase):
|
|
|
|
def __init__(self, *args, **vars):
|
|
unittest.TestCase.__init__(self, *args, **vars)
|
|
self.db = None
|
|
|
|
def setUp(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
self.i_id = db.tt.insert(aa='1')
|
|
self.assertEqual(isinstance(self.i_id, long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='2'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='3'), long), True)
|
|
self.db = db
|
|
|
|
def testRun(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.belongs(('1', '3'))).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.belongs(['1', '3'])).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.belongs(['1', '3'])).count(), 2)
|
|
self.assertEqual(db(db.tt.id.belongs([self.i_id])).count(), 1)
|
|
self.assertEqual(db(db.tt.id.belongs([])).count(), 0)
|
|
|
|
@unittest.skipIf(IS_GAE or IS_MONGODB, "Datastore/Mongodb belongs() does not accept nested queries")
|
|
def testNested(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.id == self.i_id)._select(db.tt.aa))).count(), 1)
|
|
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(('1',
|
|
'3')))._select(db.tt.aa))).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(db
|
|
(db.tt.aa.belongs(('1', '3')))._select(db.tt.aa)))._select(
|
|
db.tt.aa))).count(),
|
|
2)
|
|
|
|
def tearDown(self):
|
|
db = self.db
|
|
drop(db.tt)
|
|
db.close()
|
|
self.db = None
|
|
|
|
|
|
@unittest.skipIf(IS_GAE or IS_IMAP, "Contains not supported on GAE Datastore. TODO: IMAP tests")
|
|
class TestContains(unittest.TestCase):
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['aaa','bbb'],bb='aaa'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['bbb','ddd'],bb='abb'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['eee','aaa'],bb='acc'), long), True)
|
|
self.assertEqual(db(db.tt.aa.contains('aaa')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.contains('bbb')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.contains('aa')).count(), 0)
|
|
self.assertEqual(db(db.tt.bb.contains('a')).count(), 3)
|
|
self.assertEqual(db(db.tt.bb.contains('b')).count(), 1)
|
|
self.assertEqual(db(db.tt.bb.contains('d')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
|
|
|
|
# case-sensitivity tests, if 1 it isn't
|
|
is_case_insensitive = db(db.tt.bb.contains('AAA', case_sensitive=True)).count()
|
|
if is_case_insensitive:
|
|
self.assertEqual(db(db.tt.aa.contains('AAA')).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains('A')).count(), 3)
|
|
else:
|
|
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=True)).count(), 0)
|
|
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=True)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains('AAA', case_sensitive=False)).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains('A', case_sensitive=False)).count(), 3)
|
|
db.tt.drop()
|
|
|
|
# integers in string fields
|
|
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'), Field('cc','integer'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['123','456'],bb='123', cc=12), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['124','456'],bb='123', cc=123), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=['125','457'],bb='23', cc=125), long), True)
|
|
self.assertEqual(db(db.tt.aa.contains(123)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.contains(23)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1)
|
|
self.assertEqual(db(db.tt.bb.contains(123)).count(), 2)
|
|
self.assertEqual(db(db.tt.bb.contains(23)).count(), 3)
|
|
self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2)
|
|
db.tt.drop()
|
|
|
|
# string field contains string field
|
|
db.define_table('tt', Field('aa'), Field('bb'))
|
|
db.tt.insert(aa='aaa', bb='%aaa')
|
|
db.tt.insert(aa='aaa', bb='aaa')
|
|
self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
|
|
drop(db.tt)
|
|
|
|
# escaping
|
|
db.define_table('tt', Field('aa'))
|
|
db.tt.insert(aa='perc%ent')
|
|
db.tt.insert(aa='percent')
|
|
db.tt.insert(aa='percxyzent')
|
|
db.tt.insert(aa='under_score')
|
|
db.tt.insert(aa='underxscore')
|
|
db.tt.insert(aa='underyscore')
|
|
self.assertEqual(db(db.tt.aa.contains('perc%ent')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.contains('under_score')).count(), 1)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_GAE, "Like not supported on GAE Datastore.")
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestLike(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='abc'), long), True)
|
|
self.db = db
|
|
|
|
def tearDown(self):
|
|
db = self.db
|
|
drop(db.tt)
|
|
db.close()
|
|
self.db = None
|
|
|
|
def testRun(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.like('a%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%b%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%d%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('ab_')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('a_c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('_bc')).count(), 1)
|
|
|
|
self.assertEqual(db(db.tt.aa.like('A%', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%B%', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%C', case_sensitive=False)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('A%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('%B%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.ilike('%C')).count(), 1)
|
|
|
|
#DAL maps like() (and contains(), startswith(), endswith())
|
|
#to the LIKE operator, that in ANSI-SQL is case-sensitive
|
|
#There are backends supporting case-sensitivity by default
|
|
#and backends that needs additional care to turn
|
|
#case-sensitivity on. To discern among those, let's run
|
|
#this query comparing previously inserted 'abc' with 'ABC':
|
|
#if the result is 0, then the backend recognizes
|
|
#case-sensitivity, if 1 it isn't
|
|
is_case_insensitive = db(db.tt.aa.like('%ABC%')).count()
|
|
self.assertEqual(db(db.tt.aa.like('A%')).count(), is_case_insensitive)
|
|
self.assertEqual(db(db.tt.aa.like('%B%')).count(), is_case_insensitive)
|
|
self.assertEqual(db(db.tt.aa.like('%C')).count(), is_case_insensitive)
|
|
|
|
def testUpperLower(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.upper().like('A%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.upper().like('%B%')).count(),1)
|
|
self.assertEqual(db(db.tt.aa.upper().like('%C')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.lower().like('%c')).count(), 1)
|
|
|
|
def testStartsEndsWith(self):
|
|
db = self.db
|
|
self.assertEqual(db(db.tt.aa.startswith('a')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.endswith('c')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.startswith('c')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.endswith('a')).count(), 0)
|
|
|
|
def testEscaping(self):
|
|
db = self.db
|
|
term = 'ahbc'.replace('h', '\\') #funny but to avoid any doubts...
|
|
db.tt.insert(aa='a%bc')
|
|
db.tt.insert(aa='a_bc')
|
|
db.tt.insert(aa=term)
|
|
self.assertEqual(db(db.tt.aa.like('%ax%bc%', escape='x')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%ax_bc%', escape='x')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%'+term+'%')).count(), 1)
|
|
db(db.tt.id>0).delete()
|
|
# test "literal" like, i.e. exactly as LIKE in the backend
|
|
db.tt.insert(aa='perc%ent')
|
|
db.tt.insert(aa='percent')
|
|
db.tt.insert(aa='percxyzent')
|
|
db.tt.insert(aa='under_score')
|
|
db.tt.insert(aa='underxscore')
|
|
db.tt.insert(aa='underyscore')
|
|
self.assertEqual(db(db.tt.aa.like('%perc%ent%')).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.like('%under_score%')).count(), 3)
|
|
db(db.tt.id>0).delete()
|
|
# escaping with startswith and endswith
|
|
db.tt.insert(aa='%percent')
|
|
db.tt.insert(aa='xpercent')
|
|
db.tt.insert(aa='discount%')
|
|
db.tt.insert(aa='discountx')
|
|
self.assertEqual(db(db.tt.aa.endswith('discount%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('discount%%')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.startswith('%percent')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%%percent')).count(), 2)
|
|
|
|
def testRegexp(self):
|
|
db = self.db
|
|
db(db.tt.id>0).delete()
|
|
db.tt.insert(aa='%percent')
|
|
db.tt.insert(aa='xpercent')
|
|
db.tt.insert(aa='discount%')
|
|
db.tt.insert(aa='discountx')
|
|
try:
|
|
self.assertEqual(db(db.tt.aa.regexp('count')).count(), 2)
|
|
except NotImplementedError:
|
|
pass
|
|
else:
|
|
self.assertEqual(db(db.tt.aa.lower().regexp('count')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') &
|
|
db.tt.aa.lower().regexp('count')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.upper().regexp('COUNT') |
|
|
(db.tt.aa.lower()=='xpercent')).count(), 3)
|
|
|
|
def testLikeInteger(self):
|
|
db = self.db
|
|
db.tt.drop()
|
|
db.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True)
|
|
self.assertEqual(db(db.tt.aa.like('1%')).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.like('1_3%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('2%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('_2%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('12%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('012%')).count(), 0)
|
|
self.assertEqual(db(db.tt.aa.like('%45%')).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.like('%54%')).count(), 0)
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestDatetime(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', 'datetime'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=datetime.datetime(1971, 12, 21,
|
|
11, 30)), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=datetime.datetime(1971, 11, 21,
|
|
10, 30)), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=datetime.datetime(1970, 12, 21,
|
|
9, 31)), long), True)
|
|
self.assertEqual(db(db.tt.aa == datetime.datetime(1971, 12,
|
|
21, 11, 30)).count(), 1)
|
|
self.assertEqual(db(db.tt.aa >= datetime.datetime(1971, 1, 1)).count(), 2)
|
|
|
|
if IS_MONGODB:
|
|
self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.month() > 11).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1)
|
|
self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2)
|
|
self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3)
|
|
self.assertEqual(db(db.tt.aa.epoch() < 365*24*3600).delete(), 1)
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa', 'time'))
|
|
t0 = datetime.time(10, 30, 55)
|
|
db.tt.insert(aa=t0)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
drop(db.tt)
|
|
|
|
db.define_table('tt', Field('aa', 'date'))
|
|
t0 = datetime.date.today()
|
|
db.tt.insert(aa=t0)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_GAE or IS_IMAP, "Expressions are not supported")
|
|
class TestExpressions(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
if IS_MONGODB:
|
|
DAL_OPTS = (
|
|
(True, {'adapter_args': {'safe': True}}),
|
|
(False, {'adapter_args': {'safe': False}}),
|
|
)
|
|
for dal_opt in DAL_OPTS:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], **dal_opt[1])
|
|
db.define_table('tt', Field('aa', 'integer'),
|
|
Field('bb', 'integer', default=0), Field('cc'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1), long), dal_opt[0])
|
|
self.assertEqual(isinstance(db.tt.insert(aa=2), long), dal_opt[0])
|
|
self.assertEqual(isinstance(db.tt.insert(aa=3), long), dal_opt[0])
|
|
|
|
# test update
|
|
self.assertEqual(db(db.tt.aa == 3).update(aa=db.tt.aa + 1,
|
|
bb=db.tt.bb + 2), 1)
|
|
self.assertEqual(db(db.tt.aa == 4).count(), 1)
|
|
self.assertEqual(db(db.tt.bb == 2).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == -2).count(), 0)
|
|
self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1)
|
|
self.assertEqual(db(db.tt.bb == 5).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2,
|
|
cc='cc'), 1)
|
|
self.assertEqual(db(db.tt.cc == 'cc').count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa *
|
|
(db.tt.bb - 3)), 1)
|
|
self.assertEqual(db(db.tt.bb == 12).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1,
|
|
cc=db.tt.cc + '1' +'1'), 1)
|
|
self.assertEqual(db(db.tt.cc == 'cc11').count(), 1)
|
|
self.assertEqual(db(db.tt.aa == 3).count(), 1)
|
|
|
|
# test comparsion expression based count
|
|
self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0)
|
|
self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3)
|
|
|
|
# test select aggregations
|
|
sum = (db.tt.aa + 1).sum()
|
|
self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7)
|
|
self.assertEqual(db((1==0) & (db.tt.aa >= db.tt.aa)).count(), 0)
|
|
self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None)
|
|
|
|
count=db.tt.aa.count()
|
|
avg=db.tt.aa.avg()
|
|
min=db.tt.aa.min()
|
|
max=db.tt.aa.max()
|
|
result = db(db.tt).select(sum, count, avg, min, max).first()
|
|
self.assertEqual(result[sum], 9)
|
|
self.assertEqual(result[count], 3)
|
|
self.assertEqual(result[avg], 2)
|
|
self.assertEqual(result[min], 1)
|
|
self.assertEqual(result[max], 3)
|
|
|
|
# Test basic expressions evaluated at python level
|
|
self.assertEqual(db((1==1) & (db.tt.aa >= 2)).count(), 2)
|
|
self.assertEqual(db((1==1) | (db.tt.aa >= 2)).count(), 3)
|
|
self.assertEqual(db((1==0) & (db.tt.aa >= 2)).count(), 0)
|
|
self.assertEqual(db((1==0) | (db.tt.aa >= 2)).count(), 2)
|
|
|
|
# test abs()
|
|
self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa*-10), 1)
|
|
abs=db.tt.aa.abs().with_alias('abs')
|
|
result = db(db.tt.aa == -20).select(abs).first()
|
|
self.assertEqual(result[abs], 20)
|
|
self.assertEqual(result['abs'], 20)
|
|
abs=db.tt.aa.abs()/10+5
|
|
exp=abs.min()*2+1
|
|
result = db(db.tt.aa == -20).select(exp).first()
|
|
self.assertEqual(result[exp], 15)
|
|
|
|
# test case()
|
|
condition = db.tt.aa > 2
|
|
case = condition.case(db.tt.aa + 2, db.tt.aa - 2)
|
|
my_case = case.with_alias('my_case')
|
|
result = db().select(my_case)
|
|
self.assertEqual(len(result), 3)
|
|
self.assertEqual(result[0][my_case], -1)
|
|
self.assertEqual(result[0]['my_case'], -1)
|
|
self.assertEqual(result[1]['my_case'], -22)
|
|
self.assertEqual(result[2]['my_case'], 5)
|
|
|
|
# test expression based delete
|
|
self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1)
|
|
self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1)
|
|
self.assertEqual(db(db.tt.aa).count(), 2)
|
|
|
|
# cleanup
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testUpdate(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
|
|
# some db's only support seconds
|
|
datetime_datetime_today = datetime.datetime.today()
|
|
datetime_datetime_today = datetime_datetime_today.replace(
|
|
microsecond = 0)
|
|
one_day = datetime.timedelta(1)
|
|
one_sec = datetime.timedelta(0,1)
|
|
|
|
update_vals = (
|
|
('string', 'x', 'y'),
|
|
('text', 'x', 'y'),
|
|
('password', 'x', 'y'),
|
|
('integer', 1, 2),
|
|
('bigint', 1, 2),
|
|
('float', 1.0, 2.0),
|
|
('double', 1.0, 2.0),
|
|
('boolean', True, False),
|
|
('date', datetime.date.today(), datetime.date.today() + one_day),
|
|
('datetime', datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
|
|
datetime_datetime_today),
|
|
('time', datetime_datetime_today.time(),
|
|
(datetime_datetime_today + one_sec).time()),
|
|
)
|
|
|
|
for uv in update_vals:
|
|
db.define_table('tt', Field('aa', 'integer', default=0),
|
|
Field('bb', uv[0]))
|
|
self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long))
|
|
self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1])
|
|
self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1)
|
|
self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2])
|
|
db.tt.drop()
|
|
db.close()
|
|
|
|
def testSubstring(self):
|
|
if IS_MONGODB:
|
|
# MongoDB does not support string length
|
|
end = 3
|
|
else:
|
|
end = -2
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
t0 = db.define_table('t0', Field('name'))
|
|
input_name = "web2py"
|
|
t0.insert(name=input_name)
|
|
exp_slice = t0.name.lower()[4:6]
|
|
exp_slice_no_max = t0.name.lower()[4:]
|
|
exp_slice_neg_max = t0.name.lower()[2:end]
|
|
exp_slice_neg_start = t0.name.lower()[end:]
|
|
exp_item = t0.name.lower()[3]
|
|
out = db(t0).select(exp_slice, exp_item, exp_slice_no_max,
|
|
exp_slice_neg_max, exp_slice_neg_start).first()
|
|
self.assertEqual(out[exp_slice], input_name[4:6])
|
|
self.assertEqual(out[exp_item], input_name[3])
|
|
self.assertEqual(out[exp_slice_no_max], input_name[4:])
|
|
self.assertEqual(out[exp_slice_neg_max], input_name[2:end])
|
|
self.assertEqual(out[exp_slice_neg_start], input_name[end:])
|
|
t0.drop()
|
|
db.close()
|
|
|
|
def testOps(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
t0 = db.define_table('t0', Field('vv', 'integer'))
|
|
self.assertTrue(isinstance(db.t0.insert(vv=1), long))
|
|
self.assertTrue(isinstance(db.t0.insert(vv=2), long))
|
|
self.assertTrue(isinstance(db.t0.insert(vv=3), long))
|
|
sum = db.t0.vv.sum()
|
|
count = db.t0.vv.count()
|
|
avg=db.t0.vv.avg()
|
|
op = sum/count
|
|
op1 = (sum/count).with_alias('tot')
|
|
self.assertEqual(db(t0).select(op).first()[op], 2)
|
|
self.assertEqual(db(t0).select(op1).first()[op1], 2)
|
|
self.assertEqual(db(t0).select(op1).first()['tot'], 2)
|
|
op2 = avg*count
|
|
self.assertEqual(db(t0).select(op2).first()[op2], 6)
|
|
# the following is not possible at least on sqlite
|
|
sum = db.t0.vv.sum().with_alias('s')
|
|
count = db.t0.vv.count().with_alias('c')
|
|
op = sum/count
|
|
with self.assertRaises(SyntaxError):
|
|
self.assertEqual(db(t0).select(op).first()[op], 2)
|
|
t0.drop()
|
|
db.close()
|
|
|
|
|
|
@unittest.skip("JOIN queries are not supported")
|
|
class TestJoin(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('t1', Field('aa'))
|
|
db.define_table('t2', Field('aa'), Field('b', db.t1))
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
drop(db.t2)
|
|
drop(db.t1)
|
|
|
|
db.define_table('person',Field('name'))
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name'),Field('ownerperson','reference person'))
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
drop(db.dog)
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
drop(db.person)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_GAE or IS_IMAP, 'TODO: Datastore throws "AttributeError: Row object has no attribute _extra"')
|
|
class TestMinMaxSumAvg(unittest.TestCase):
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=1), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=2), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=3), long), True)
|
|
s = db.tt.aa.min()
|
|
self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1)
|
|
self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1)
|
|
self.assertEqual(db().select(s).first()[s], 1)
|
|
s = db.tt.aa.max()
|
|
self.assertEqual(db().select(s).first()[s], 3)
|
|
s = db.tt.aa.sum()
|
|
self.assertEqual(db().select(s).first()[s], 6)
|
|
s = db.tt.aa.count()
|
|
self.assertEqual(db().select(s).first()[s], 3)
|
|
s = db.tt.aa.avg()
|
|
self.assertEqual(db().select(s).first()[s], 2)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestMigrations(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), migrate='.storage.table')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('b'),
|
|
migrate='.storage.table')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), Field('b', 'text'),
|
|
migrate='.storage.table')
|
|
db.commit()
|
|
db.close()
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'), migrate='.storage.table')
|
|
drop(db.tt)
|
|
db.commit()
|
|
db.close()
|
|
|
|
def tearDown(self):
|
|
if os.path.exists('.storage.db'):
|
|
os.unlink('.storage.db')
|
|
if os.path.exists('.storage.table'):
|
|
os.unlink('.storage.table')
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestReference(unittest.TestCase):
|
|
def testRun(self):
|
|
scenarios = (
|
|
(True, 'CASCADE'),
|
|
(False, 'CASCADE'),
|
|
(False, 'SET NULL'),
|
|
)
|
|
for (b, ondelete) in scenarios:
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], bigint_id=b)
|
|
db.define_table('tt', Field('name'),
|
|
Field('aa','reference tt',ondelete=ondelete))
|
|
db.commit()
|
|
x = db.tt.insert(name='xxx')
|
|
self.assertTrue(isinstance(x, long))
|
|
self.assertEqual(x.id, x)
|
|
self.assertEqual(x['id'], x)
|
|
x.aa = x
|
|
x.update_record()
|
|
x1 = db.tt[x]
|
|
self.assertEqual(x1.aa, x)
|
|
self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, 'xxx')
|
|
y=db.tt.insert(name='yyy', aa = x1)
|
|
self.assertEqual(y.aa, x1.id)
|
|
self.assertTrue(isinstance(db.tt.insert(name='zzz'), long))
|
|
self.assertEqual(db(db.tt.name).count(), 3)
|
|
if IS_MONGODB:
|
|
db(db.tt.id == x).delete()
|
|
expected_count = {
|
|
'SET NULL': 2,
|
|
'CASCADE': 1,
|
|
}
|
|
self.assertEqual(db(db.tt.name).count(), expected_count[ondelete])
|
|
if ondelete == 'SET NULL':
|
|
self.assertEqual(
|
|
db(db.tt.name == 'yyy').select()[0].aa, None)
|
|
drop(db.tt)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP")
|
|
class TestClientLevelOps(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
db.commit()
|
|
db.tt.insert(aa="test")
|
|
rows1 = db(db.tt.aa=='test').select()
|
|
rows2 = db(db.tt.aa=='test').select()
|
|
rows3 = rows1 + rows2
|
|
assert len(rows3) == 2
|
|
rows4 = rows1 & rows2
|
|
assert len(rows4) == 1
|
|
rows5 = rows1 | rows2
|
|
assert len(rows5) == 1
|
|
rows6 = rows1.find(lambda row: row.aa=="test")
|
|
assert len(rows6) == 1
|
|
rows7 = rows2.exclude(lambda row: row.aa=="test")
|
|
assert len(rows7) == 1
|
|
rows8 = rows5.sort(lambda row: row.aa)
|
|
assert len(rows8) == 1
|
|
drop(db.tt)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestVirtualFields(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt', Field('aa'))
|
|
db.commit()
|
|
db.tt.insert(aa="test")
|
|
class Compute:
|
|
def a_upper(row): return row.tt.aa.upper()
|
|
db.tt.virtualfields.append(Compute())
|
|
assert db(db.tt.id>0).select().first().a_upper == 'TEST'
|
|
drop(db.tt)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestComputedFields(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb',default='x'),
|
|
Field('cc',compute=lambda r: r.aa+r.bb))
|
|
db.commit()
|
|
id = db.tt.insert(aa="z")
|
|
self.assertEqual(db.tt[id].cc,'zx')
|
|
drop(db.tt)
|
|
db.commit()
|
|
|
|
# test checking that a compute field can refer to earlier-defined computed fields
|
|
db.define_table('tt',
|
|
Field('aa'),
|
|
Field('bb',default='x'),
|
|
Field('cc',compute=lambda r: r.aa+r.bb),
|
|
Field('dd',compute=lambda r: r.bb + r.cc))
|
|
db.commit()
|
|
id = db.tt.insert(aa="z")
|
|
self.assertEqual(db.tt[id].dd,'xzx')
|
|
drop(db.tt)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestCommonFilters(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('t1', Field('aa', 'integer'))
|
|
db.define_table('t2', Field('aa', 'integer'), Field('b', db.t1))
|
|
i1 = db.t1.insert(aa=1)
|
|
i2 = db.t1.insert(aa=2)
|
|
i3 = db.t1.insert(aa=3)
|
|
db.t2.insert(aa=4, b=i1)
|
|
db.t2.insert(aa=5, b=i2)
|
|
db.t2.insert(aa=6, b=i2)
|
|
db.t1._common_filter = lambda q: db.t1.aa>1
|
|
self.assertEqual(db(db.t1).count(),2)
|
|
self.assertEqual(db(db.t1).count(),2)
|
|
db.t2._common_filter = lambda q: db.t2.aa<6
|
|
# test delete
|
|
self.assertEqual(db(db.t2).count(),2)
|
|
db(db.t2).delete()
|
|
self.assertEqual(db(db.t2).count(),0)
|
|
db.t2._common_filter = None
|
|
self.assertEqual(db(db.t2).count(),1)
|
|
# test update
|
|
db.t2.insert(aa=4, b=i1)
|
|
db.t2.insert(aa=5, b=i2)
|
|
db.t2._common_filter = lambda q: db.t2.aa<6
|
|
self.assertEqual(db(db.t2).count(),2)
|
|
db(db.t2).update(aa=6)
|
|
self.assertEqual(db(db.t2).count(),0)
|
|
db.t2._common_filter = None
|
|
self.assertEqual(db(db.t2).count(),3)
|
|
drop(db.t2)
|
|
drop(db.t1)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
|
|
class TestImportExportFields(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('person', Field('name'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
for n in range(2):
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
for k in range(10):
|
|
id = db.person.insert(name=str(k))
|
|
db.pet.insert(friend=id,name=str(k))
|
|
db.commit()
|
|
stream = StringIO()
|
|
db.export_to_csv_file(stream)
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
stream = StringIO(stream.getvalue())
|
|
db.import_from_csv_file(stream)
|
|
assert db(db.person).count()==10
|
|
assert db(db.pet.name).count()==10
|
|
drop(db.pet)
|
|
drop(db.person)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
|
|
class TestImportExportUuidFields(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('person', Field('name'),Field('uuid'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
for n in range(2):
|
|
db(db.pet).delete()
|
|
db(db.person).delete()
|
|
for k in range(10):
|
|
id = db.person.insert(name=str(k),uuid=str(k))
|
|
db.pet.insert(friend=id,name=str(k))
|
|
db.commit()
|
|
stream = StringIO()
|
|
db.export_to_csv_file(stream)
|
|
db(db.person).delete()
|
|
db(db.pet).delete()
|
|
stream = StringIO(stream.getvalue())
|
|
db.import_from_csv_file(stream)
|
|
assert db(db.person).count()==10
|
|
assert db(db.pet).count()==10
|
|
drop(db.pet)
|
|
drop(db.person)
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
|
|
class TestDALDictImportExport(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table('person', Field('name', default="Michael"),Field('uuid'))
|
|
db.define_table('pet',Field('friend',db.person),Field('name'))
|
|
dbdict = db.as_dict(flat=True, sanitize=False)
|
|
assert isinstance(dbdict, dict)
|
|
uri = dbdict["uri"]
|
|
assert isinstance(uri, basestring) and uri
|
|
assert len(dbdict["tables"]) == 2
|
|
assert len(dbdict["tables"][0]["fields"]) == 3
|
|
assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
|
|
assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
|
|
|
|
db2 = DAL(**dbdict)
|
|
assert len(db.tables) == len(db2.tables)
|
|
assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
|
|
assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
|
|
drop(db.pet)
|
|
db.commit()
|
|
|
|
db2.commit()
|
|
|
|
have_serializers = True
|
|
try:
|
|
import serializers
|
|
dbjson = db.as_json(sanitize=False)
|
|
assert isinstance(dbjson, basestring) and len(dbjson) > 0
|
|
|
|
unicode_keys = True
|
|
if sys.version < "2.6.5":
|
|
unicode_keys = False
|
|
db3 = DAL(**serializers.loads_json(dbjson,
|
|
unicode_keys=unicode_keys))
|
|
assert hasattr(db3, "person") and hasattr(db3.person, "uuid") and\
|
|
db3.person.uuid.type == db.person.uuid.type
|
|
drop(db3.person)
|
|
db3.commit()
|
|
db3.close()
|
|
except ImportError:
|
|
pass
|
|
|
|
mpfc = "Monty Python's Flying Circus"
|
|
dbdict4 = {"uri": DEFAULT_URI,
|
|
"tables":[{"tablename": "tvshow",
|
|
"fields": [{"fieldname": "name",
|
|
"default":mpfc},
|
|
{"fieldname": "rating",
|
|
"type":"double"}]},
|
|
{"tablename": "staff",
|
|
"fields": [{"fieldname": "name",
|
|
"default":"Michael"},
|
|
{"fieldname": "food",
|
|
"default":"Spam"},
|
|
{"fieldname": "tvshow",
|
|
"type": "reference tvshow"}]}]}
|
|
db4 = DAL(**dbdict4)
|
|
assert "staff" in db4.tables
|
|
assert "name" in db4.staff
|
|
assert db4.tvshow.rating.type == "double"
|
|
assert (isinstance(db4.tvshow.insert(), long), isinstance(db4.tvshow.insert(name="Loriot"), long),
|
|
isinstance(db4.tvshow.insert(name="Il Mattatore"), long)) == (True, True, True)
|
|
assert isinstance(db4(db4.tvshow).select().first().id, long) == True
|
|
assert db4(db4.tvshow).select().first().name == mpfc
|
|
|
|
drop(db4.staff)
|
|
drop(db4.tvshow)
|
|
db4.commit()
|
|
|
|
dbdict5 = {"uri": DEFAULT_URI}
|
|
db5 = DAL(**dbdict5)
|
|
assert db5.tables in ([], None)
|
|
assert not (str(db5) in ("", None))
|
|
|
|
dbdict6 = {"uri": DEFAULT_URI,
|
|
"tables":[{"tablename": "staff"},
|
|
{"tablename": "tvshow",
|
|
"fields": [{"fieldname": "name"},
|
|
{"fieldname": "rating", "type":"double"}
|
|
]
|
|
}]
|
|
}
|
|
db6 = DAL(**dbdict6)
|
|
|
|
assert len(db6["staff"].fields) == 1
|
|
assert "name" in db6["tvshow"].fields
|
|
|
|
assert db6.staff.insert() is not None
|
|
assert isinstance(db6(db6.staff).select().first().id, long) == True
|
|
|
|
|
|
drop(db6.staff)
|
|
drop(db6.tvshow)
|
|
db6.commit()
|
|
db.close()
|
|
db2.close()
|
|
db4.close()
|
|
db5.close()
|
|
db6.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestSelectAsDict(unittest.TestCase):
|
|
|
|
def testSelect(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db.define_table(
|
|
'a_table',
|
|
Field('b_field'),
|
|
Field('a_field'),
|
|
)
|
|
db.a_table.insert(a_field="aa1", b_field="bb1")
|
|
rtn = db(db.a_table).select(db.a_table.id, db.a_table.b_field, db.a_table.a_field).as_list()
|
|
self.assertEqual(rtn[0]['b_field'], 'bb1')
|
|
keys = rtn[0].keys()
|
|
self.assertEqual(len(keys), 3)
|
|
self.assertEqual(("id" in keys, "b_field" in keys, "a_field" in keys), (True, True, True))
|
|
drop(db.a_table)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestRNameTable(unittest.TestCase):
|
|
#tests for highly experimental rname attribute
|
|
def testSelect(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'a very complicated tablename')
|
|
db.define_table(
|
|
'easy_name',
|
|
Field('a_field'),
|
|
rname=rname
|
|
)
|
|
rtn = db.easy_name.insert(a_field='a')
|
|
self.assertEqual(isinstance(rtn.id, long), True)
|
|
rtn = db(db.easy_name.a_field == 'a').select()
|
|
self.assertEqual(len(rtn), 1)
|
|
self.assertEqual(isinstance(rtn[0].id, long), True)
|
|
self.assertEqual(rtn[0].a_field, 'a')
|
|
db.easy_name.insert(a_field='b')
|
|
self.assertEqual(db(db.easy_name).count(), 2)
|
|
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
|
|
self.assertEqual(rtn, 1)
|
|
|
|
#clean up
|
|
drop(db.easy_name)
|
|
db.close()
|
|
|
|
|
|
@unittest.skip("JOIN queries are not supported")
|
|
def testJoin(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'this is table t1')
|
|
rname2 = _quote(db, 'this is table t2')
|
|
db.define_table('t1', Field('aa'), rname=rname)
|
|
db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2)
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
drop(db.t2)
|
|
drop(db.t1)
|
|
|
|
db.define_table('person',Field('name'), rname=rname)
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name'),Field('ownerperson','reference person'), rname=rname2)
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
drop(db.dog)
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
drop(db.person)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
@unittest.skipIf(IS_GAE, 'TODO: Datastore AGGREGATE Not Supported')
|
|
class TestRNameFields(unittest.TestCase):
|
|
# tests for highly experimental rname attribute
|
|
def testSelect(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'a very complicated fieldname')
|
|
rname2 = _quote(db, 'rating from 1 to 10')
|
|
db.define_table(
|
|
'easy_name',
|
|
Field('a_field', rname=rname),
|
|
Field('rating', 'integer', rname=rname2, default=2)
|
|
)
|
|
rtn = db.easy_name.insert(a_field='a')
|
|
self.assertEqual(isinstance(rtn.id, long), True)
|
|
rtn = db(db.easy_name.a_field == 'a').select()
|
|
self.assertEqual(len(rtn), 1)
|
|
self.assertEqual(isinstance(rtn[0].id, long), True)
|
|
self.assertEqual(rtn[0].a_field, 'a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).delete()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 0)
|
|
db.easy_name.insert(a_field='a')
|
|
db.easy_name.insert(a_field='b')
|
|
rtn = db(db.easy_name.id > 0).count()
|
|
self.assertEqual(rtn, 2)
|
|
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
|
|
rtn = db(db.easy_name.a_field == 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
rtn = db(db.easy_name.a_field != 'c').count()
|
|
self.assertEqual(rtn, 1)
|
|
avg = db.easy_name.rating.avg()
|
|
rtn = db(db.easy_name.id > 0).select(avg)
|
|
self.assertEqual(rtn[0][avg], 2)
|
|
|
|
rname = _quote(db, 'this is the person name')
|
|
db.define_table(
|
|
'person',
|
|
Field('name', default="Michael", rname=rname),
|
|
Field('uuid')
|
|
)
|
|
michael = db.person.insert() #default insert
|
|
john = db.person.insert(name='John')
|
|
luke = db.person.insert(name='Luke')
|
|
|
|
rtn = db(db.person.id > 0).select()
|
|
self.assertEqual(len(rtn), 3)
|
|
self.assertEqual(rtn[0].id, michael)
|
|
self.assertEqual(rtn[0].name, 'Michael')
|
|
self.assertEqual(rtn[1].id, john)
|
|
self.assertEqual(rtn[1].name, 'John')
|
|
#fetch owners, eventually with pet
|
|
#main point is retrieving Luke with no pets
|
|
rtn = db(db.person.id > 0).select()
|
|
self.assertEqual(rtn[0].id, michael)
|
|
self.assertEqual(rtn[0].name, 'Michael')
|
|
self.assertEqual(rtn[2].name, 'Luke')
|
|
self.assertEqual(rtn[2].id, luke)
|
|
#as dict
|
|
rtn = db(db.person.id > 0).select().as_dict()
|
|
self.assertEqual(rtn[michael]['name'], 'Michael')
|
|
#as list
|
|
rtn = db(db.person.id > 0).select().as_list()
|
|
self.assertEqual(rtn[0]['name'], 'Michael')
|
|
#isempty
|
|
rtn = db(db.person.id > 0).isempty()
|
|
self.assertEqual(rtn, False)
|
|
|
|
#clean up
|
|
drop(db.person)
|
|
drop(db.easy_name)
|
|
db.close()
|
|
|
|
@unittest.skipIf(IS_GAE, 'TODO: Datastore does not accept dict objects as json field input.')
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'a very complicated fieldname')
|
|
for ft in ['string', 'text', 'password', 'upload', 'blob']:
|
|
db.define_table('tt', Field('aa', ft, default='', rname=rname))
|
|
cv = 'x'
|
|
self.assertEqual(isinstance(db.tt.insert(aa='x'), long), True)
|
|
if IS_MONGODB and not PY2 and ft == 'blob':
|
|
cv = to_bytes(cv)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, cv)
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'integer', default=1, rname=rname))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=3), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'double', default=1, rname=rname))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=3.1), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'boolean', default=True, rname=rname))
|
|
self.assertEqual(isinstance(db.tt.insert(aa=True), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'json', default={}, rname=rname))
|
|
self.assertEqual(isinstance(db.tt.insert(aa={}), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, {})
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'date',
|
|
default=datetime.date.today(), rname=rname))
|
|
t0 = datetime.date.today()
|
|
self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'datetime',
|
|
default=datetime.datetime.today(), rname=rname))
|
|
t0 = datetime.datetime(
|
|
1971,
|
|
12,
|
|
21,
|
|
10,
|
|
30,
|
|
55,
|
|
0,
|
|
)
|
|
id = db.tt.insert(aa=t0)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
|
|
## Row APIs
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(db.tt[id].aa,t0)
|
|
self.assertEqual(db.tt['aa'],db.tt.aa)
|
|
self.assertEqual(db.tt(id).aa,t0)
|
|
self.assertTrue(db.tt(id,aa=None)==None)
|
|
self.assertFalse(db.tt(id,aa=t0)==None)
|
|
self.assertEqual(row.aa,t0)
|
|
self.assertEqual(row['aa'],t0)
|
|
self.assertEqual(row['tt.aa'],t0)
|
|
self.assertEqual(row('tt.aa'),t0)
|
|
|
|
## Lazy and Virtual fields
|
|
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
|
|
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
|
|
row = db().select(db.tt.aa)[0]
|
|
self.assertEqual(row.b,t0)
|
|
self.assertEqual(row.c(),t0)
|
|
|
|
drop(db.tt)
|
|
db.define_table('tt', Field('aa', 'time', default='11:30', rname=rname))
|
|
t0 = datetime.time(10, 30, 55)
|
|
self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
|
|
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
def testInsert(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'a very complicated fieldname')
|
|
db.define_table('tt', Field('aa', rname=rname))
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
|
|
self.assertEqual(db(db.tt.aa == '1').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').count(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
|
|
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
|
|
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
|
|
drop(db.tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skip("JOIN queries are not supported")
|
|
def testJoin(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
rname = _quote(db, 'this is field aa')
|
|
rname2 = _quote(db, 'this is field b')
|
|
db.define_table('t1', Field('aa', rname=rname))
|
|
db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2))
|
|
i1 = db.t1.insert(aa='1')
|
|
i2 = db.t1.insert(aa='2')
|
|
i3 = db.t1.insert(aa='3')
|
|
db.t2.insert(aa='4', b=i1)
|
|
db.t2.insert(aa='5', b=i2)
|
|
db.t2.insert(aa='6', b=i2)
|
|
self.assertEqual(len(db(db.t1.id
|
|
== db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)), 3)
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
|
|
| db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)), 4)
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
|
|
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
|
|
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa, groupby=db.t1.aa)),
|
|
3)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
|
|
1)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
|
|
2)
|
|
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
|
|
left=db.t2.on(db.t1.id == db.t2.b),
|
|
orderby=db.t1.aa,
|
|
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
|
|
0)
|
|
drop(db.t2)
|
|
drop(db.t1)
|
|
|
|
db.define_table('person',Field('name', rname=rname))
|
|
id = db.person.insert(name="max")
|
|
self.assertEqual(id.name,'max')
|
|
db.define_table('dog',Field('name', rname=rname),Field('ownerperson','reference person', rname=rname2))
|
|
db.dog.insert(name='skipper',ownerperson=1)
|
|
row = db(db.person.id==db.dog.ownerperson).select().first()
|
|
self.assertEqual(row[db.person.name],'max')
|
|
self.assertEqual(row['person.name'],'max')
|
|
drop(db.dog)
|
|
self.assertEqual(len(db.person._referenced_by),0)
|
|
drop(db.person)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestQuoting(unittest.TestCase):
|
|
|
|
# tests for case sensitivity
|
|
def testCase(self):
|
|
return
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=False)
|
|
|
|
# test table case
|
|
t0 = db.define_table('B',
|
|
Field('f', 'string'))
|
|
|
|
t1 = db.define_table('b',
|
|
Field('B', t0),
|
|
Field('words', 'text'))
|
|
|
|
blather = 'blah blah and so'
|
|
t0[0] = {'f': 'content'}
|
|
t1[0] = {'B': int(t0[1]['id']),
|
|
'words': blather}
|
|
|
|
r = db(db.B.id==db.b.B).select()
|
|
|
|
self.assertEqual(r[0].b.words, blather)
|
|
|
|
drop(t1)
|
|
drop(t0)
|
|
|
|
# test field case
|
|
t0 = db.define_table('table is a test',
|
|
Field('a_a'),
|
|
Field('a_A'))
|
|
|
|
t0[0] = dict(a_a='a_a', a_A='a_A')
|
|
|
|
self.assertEqual(t0[1].a_a, 'a_a')
|
|
self.assertEqual(t0[1].a_A, 'a_A')
|
|
|
|
drop(t0)
|
|
db.close()
|
|
|
|
def testPKFK(self):
|
|
|
|
# test primary keys
|
|
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=False)
|
|
# test table without surrogate key. Length must is limited to
|
|
# 100 because of MySQL limitations: it cannot handle more than
|
|
# 767 bytes in unique keys.
|
|
|
|
t0 = db.define_table('t0', Field('Code', length=100), primarykey=['Code'])
|
|
t2 = db.define_table('t2', Field('f'), Field('t0_Code', 'reference t0'))
|
|
t3 = db.define_table('t3', Field('f', length=100), Field('t0_Code', t0.Code), primarykey=['f'])
|
|
t4 = db.define_table('t4', Field('f', length=100), Field('t0', t0), primarykey=['f'])
|
|
|
|
try:
|
|
t5 = db.define_table('t5', Field('f', length=100), Field('t0', 'reference no_table_wrong_reference'), primarykey=['f'])
|
|
except Exception as e:
|
|
self.assertTrue(isinstance(e, KeyError))
|
|
|
|
drop(t0, 'cascade')
|
|
drop(t2)
|
|
drop(t3)
|
|
drop(t4)
|
|
db.close()
|
|
|
|
|
|
class TestTableAndFieldCase(unittest.TestCase):
|
|
"""
|
|
at the Python level we should not allow db.C and db.c because of .table conflicts on windows
|
|
but it should be possible to map two different names into distinct tables "c" and "C" at the Python level
|
|
By default Python models names should be mapped into lower case table names and assume case insensitivity.
|
|
"""
|
|
def testme(self):
|
|
return
|
|
|
|
|
|
class TestQuotesByDefault(unittest.TestCase):
|
|
"""
|
|
all default tables names should be quoted unless an explicit mapping has been given for a table.
|
|
"""
|
|
def testme(self):
|
|
return
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestRecordVersioning(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
tt = db.define_table('tt', Field('name'),
|
|
Field('is_active', 'boolean', default=True))
|
|
db.tt._enable_record_versioning(archive_name='tt_archive')
|
|
self.assertTrue('tt_archive' in db)
|
|
i_id = db.tt.insert(name='web2py1')
|
|
db.tt.insert(name='web2py2')
|
|
db(db.tt.name == 'web2py2').delete()
|
|
self.assertEqual(len(db(db.tt).select()), 1)
|
|
self.assertEqual(db(db.tt).count(), 1)
|
|
db(db.tt.id == i_id).update(name='web2py3')
|
|
self.assertEqual(len(db(db.tt).select()), 1)
|
|
self.assertEqual(db(db.tt).count(), 1)
|
|
self.assertEqual(len(db(db.tt_archive).select()), 2)
|
|
self.assertEqual(db(db.tt_archive).count(), 2)
|
|
drop(db.tt_archive)
|
|
# it allows tt to be dropped
|
|
db.tt._before_delete = []
|
|
drop(tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestConnection(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
# check for adapter reconnect without parameters
|
|
db1 = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
db1.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True)
|
|
self.assertEqual(db1(db1.tt.aa == 1).count(), 1)
|
|
drop(db1.tt)
|
|
db1._adapter.close()
|
|
db1._adapter.reconnect()
|
|
db1.define_table('tt', Field('aa', 'integer'))
|
|
self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True)
|
|
self.assertEqual(db1(db1.tt.aa == 1).count(), 1)
|
|
drop(db1.tt)
|
|
db1.close()
|
|
|
|
# check connection are reused with pool_size
|
|
connections = {}
|
|
for a in range(10):
|
|
db2 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
|
|
c = db2._adapter.connection
|
|
connections[id(c)] = c
|
|
db2.close()
|
|
self.assertEqual(len(connections), 1)
|
|
c = [connections[x] for x in connections][0]
|
|
c.commit()
|
|
c.close()
|
|
|
|
# check correct use of pool_size
|
|
dbs = []
|
|
for a in range(10):
|
|
db3 = DAL(DEFAULT_URI, check_reserved=['all'], pool_size=5)
|
|
dbs.append(db3)
|
|
for db in dbs:
|
|
db.close()
|
|
self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5)
|
|
for c in db3._adapter.POOLS[DEFAULT_URI]:
|
|
c.close()
|
|
db3._adapter.POOLS[DEFAULT_URI] = []
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestBasicOps(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
tt = db.define_table('tt', Field('name'),
|
|
Field('is_active', 'boolean', default=True))
|
|
i_id = db.tt.insert(name='web2py1')
|
|
db.tt.insert(name='web2py2')
|
|
db(db.tt.name == 'web2py2').delete()
|
|
self.assertEqual(len(db(db.tt).select()), 1)
|
|
self.assertEqual(db(db.tt).count(), 1)
|
|
db(db.tt.id == i_id).update(name='web2py3')
|
|
self.assertEqual(len(db(db.tt).select()), 1)
|
|
self.assertEqual(db(db.tt).count(), 1)
|
|
drop(tt)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
@unittest.skipIf(IS_GAE, 'TODO: Datastore "unsupported operand type"')
|
|
class TestSQLCustomType(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
from pydal.helpers.classes import SQLCustomType
|
|
native_double = "double"
|
|
native_string = "string"
|
|
if hasattr(db._adapter, 'types'):
|
|
native_double = db._adapter.types['double']
|
|
try:
|
|
native_string = db._adapter.types['string'] % {'length': 256}
|
|
except:
|
|
native_string = db._adapter.types['string']
|
|
basic_t = SQLCustomType(type = "double", native = native_double)
|
|
basic_t_str = SQLCustomType(type = "string", native = native_string)
|
|
t0=db.define_table('t0', Field("price", basic_t), Field("product", basic_t_str))
|
|
r_id = t0.insert(price=None, product=None)
|
|
row = db(t0.id == r_id).select(t0.ALL).first()
|
|
self.assertEqual(row['price'], None)
|
|
self.assertEqual(row['product'], None)
|
|
r_id = t0.insert(price=1.2, product="car")
|
|
row=db(t0.id == r_id).select(t0.ALL).first()
|
|
self.assertEqual(row['price'], 1.2)
|
|
self.assertEqual(row['product'], 'car')
|
|
t0.drop()
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_GAE or IS_IMAP, "Skip test lazy")
|
|
class TestLazy(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], lazy_tables=True)
|
|
t0 = db.define_table('t0', Field('name'))
|
|
self.assertTrue(('t0' in db._LAZY_TABLES.keys()))
|
|
db.t0.insert(name='1')
|
|
self.assertFalse(('t0' in db._LAZY_TABLES.keys()))
|
|
db.t0.drop()
|
|
db.close()
|
|
|
|
def testLazyGetter(self):
|
|
db=DAL(DEFAULT_URI, lazy_tables=True)
|
|
db.define_table('tt', Field('value', 'integer'))
|
|
db.define_table('ttt',
|
|
Field('value', 'integer'),
|
|
Field('tt_id', 'reference tt'),
|
|
)
|
|
# Force table definition
|
|
db.ttt.value.writable=False
|
|
idd=db.tt.insert(value=0)
|
|
db.ttt.insert(tt_id=idd)
|
|
db.ttt.drop()
|
|
db.tt.drop()
|
|
db.close()
|
|
|
|
def testRowNone(self):
|
|
db=DAL(DEFAULT_URI, lazy_tables=True)
|
|
tt = db.define_table('tt', Field('value', 'integer'))
|
|
db.tt.insert(value=None)
|
|
row = db(db.tt).select(db.tt.ALL).first()
|
|
self.assertEqual(row.value, None)
|
|
self.assertEqual(row[db.tt.value], None)
|
|
self.assertEqual(row['tt.value'], None)
|
|
self.assertEqual(row.get('tt.value'), None)
|
|
self.assertEqual(row['value'], None)
|
|
self.assertEqual(row.get('value'), None)
|
|
db.tt.drop()
|
|
db.close()
|
|
|
|
|
|
class TestRedefine(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'], lazy_tables=True, migrate=False)
|
|
db.define_table('t_a', Field('code'))
|
|
self.assertTrue('code' in db.t_a)
|
|
self.assertTrue('code' in db['t_a'])
|
|
db.define_table('t_a', Field('code_a'), redefine=True)
|
|
self.assertFalse('code' in db.t_a)
|
|
self.assertFalse('code' in db['t_a'])
|
|
self.assertTrue('code_a' in db.t_a)
|
|
self.assertTrue('code_a' in db['t_a'])
|
|
db.close()
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestUpdateInsert(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
t0 = db.define_table('t0', Field('name'))
|
|
i_id = db.t0.update_or_insert((db.t0.id == 1), name='web2py')
|
|
u_id = db.t0.update_or_insert((db.t0.id == i_id), name='web2py2')
|
|
self.assertTrue(i_id != None)
|
|
self.assertTrue(u_id == None)
|
|
self.assertEqual(len(db(db.t0).select()), 1)
|
|
self.assertEqual(db(db.t0).count(), 1)
|
|
self.assertEqual(db(db.t0.name == 'web2py').count(), 0)
|
|
self.assertEqual(db(db.t0.name == 'web2py2').count(), 1)
|
|
drop(t0)
|
|
db.close()
|
|
|
|
|
|
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
|
|
class TestBulkInsert(unittest.TestCase):
|
|
|
|
def testRun(self):
|
|
db = DAL(DEFAULT_URI, check_reserved=['all'])
|
|
t0 = db.define_table('t0', Field('name'))
|
|
global ctr
|
|
ctr = 0
|
|
def test_after_insert(i, r):
|
|
self.assertIsInstance(i, OpRow)
|
|
global ctr
|
|
ctr += 1
|
|
return True
|
|
t0._after_insert.append(test_after_insert)
|
|
items = [{'name':'web2py_%s' % pos} for pos in range(0, 10, 1)]
|
|
t0.bulk_insert(items)
|
|
self.assertTrue(db(t0).count() == len(items))
|
|
for pos in range(0, 10, 1):
|
|
self.assertEqual(len(db(t0.name == 'web2py_%s' % pos).select()), 1)
|
|
self.assertEqual(db(t0.name == 'web2py_%s' % pos).count(), 1)
|
|
self.assertTrue(ctr == len(items))
|
|
drop(t0)
|
|
db.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
tearDownModule()
|