506 lines
16 KiB
Python
506 lines
16 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
| This file is part of the web2py Web Framework
|
|
| Developed by Massimo Di Pierro <mdipierro@cs.depaul.edu>,
|
|
| limodou <limodou@gmail.com> and srackham <srackham@gmail.com>.
|
|
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
|
|
|
|
Web2py environment in the shell
|
|
--------------------------------
|
|
"""
|
|
from __future__ import print_function
|
|
|
|
import os
|
|
import sys
|
|
import code
|
|
import copy
|
|
import logging
|
|
import types
|
|
import re
|
|
import optparse
|
|
import glob
|
|
import traceback
|
|
import gluon.fileutils as fileutils
|
|
from gluon.settings import global_settings
|
|
from gluon.utils import web2py_uuid
|
|
from gluon.compileapp import build_environment, read_pyc, run_models_in
|
|
from gluon.restricted import RestrictedError
|
|
from gluon.globals import Request, Response, Session
|
|
from gluon.storage import Storage, List
|
|
from gluon.admin import w2p_unpack
|
|
from pydal.base import BaseAdapter
|
|
from gluon._compat import iteritems, ClassType, PY2
|
|
|
|
logger = logging.getLogger("web2py")
|
|
|
|
if not PY2:
|
|
def execfile(filename, global_vars=None, local_vars=None):
|
|
with open(filename) as f:
|
|
code = compile(f.read(), filename, 'exec')
|
|
exec(code, global_vars, local_vars)
|
|
raw_input = input
|
|
|
|
|
|
def enable_autocomplete_and_history(adir, env):
|
|
try:
|
|
import rlcompleter
|
|
import atexit
|
|
import readline
|
|
except ImportError:
|
|
pass
|
|
else:
|
|
readline.parse_and_bind("tab: complete")
|
|
history_file = os.path.join(adir, '.pythonhistory')
|
|
try:
|
|
readline.read_history_file(history_file)
|
|
except IOError:
|
|
open(history_file, 'a').close()
|
|
atexit.register(readline.write_history_file, history_file)
|
|
readline.set_completer(rlcompleter.Completer(env).complete)
|
|
|
|
|
|
def exec_environment(
|
|
pyfile='',
|
|
request=None,
|
|
response=None,
|
|
session=None,
|
|
):
|
|
"""Environment builder and module loader.
|
|
|
|
Builds a web2py environment and optionally executes a Python file into
|
|
the environment.
|
|
|
|
A Storage dictionary containing the resulting environment is returned.
|
|
The working directory must be web2py root -- this is the web2py default.
|
|
|
|
"""
|
|
|
|
if request is None:
|
|
request = Request({})
|
|
if response is None:
|
|
response = Response()
|
|
if session is None:
|
|
session = Session()
|
|
|
|
if request.folder is None:
|
|
mo = re.match(r'(|.*/)applications/(?P<appname>[^/]+)', pyfile)
|
|
if mo:
|
|
appname = mo.group('appname')
|
|
request.folder = os.path.abspath(os.path.join('applications', appname))
|
|
else:
|
|
request.folder = ''
|
|
env = build_environment(request, response, session, store_current=False)
|
|
if pyfile:
|
|
pycfile = pyfile + 'c'
|
|
if os.path.isfile(pycfile):
|
|
exec (read_pyc(pycfile), env)
|
|
else:
|
|
execfile(pyfile, env)
|
|
return Storage(env)
|
|
|
|
|
|
def env(
|
|
a,
|
|
import_models=False,
|
|
c=None,
|
|
f=None,
|
|
dir='',
|
|
extra_request={},
|
|
):
|
|
"""
|
|
Returns web2py execution environment for application (a), controller (c),
|
|
function (f).
|
|
If import_models is True the exec all application models into the
|
|
environment.
|
|
|
|
extra_request allows you to pass along any extra variables to the request
|
|
object before your models get executed. This was mainly done to support
|
|
web2py_utils.test_runner, however you can use it with any wrapper scripts
|
|
that need access to the web2py environment.
|
|
"""
|
|
|
|
request = Request({})
|
|
response = Response()
|
|
session = Session()
|
|
request.application = a
|
|
|
|
# Populate the dummy environment with sensible defaults.
|
|
|
|
if not dir:
|
|
request.folder = os.path.join('applications', a)
|
|
else:
|
|
request.folder = dir
|
|
request.controller = c or 'default'
|
|
request.function = f or 'index'
|
|
response.view = '%s/%s.html' % (request.controller,
|
|
request.function)
|
|
if global_settings.cmd_options:
|
|
ip = global_settings.cmd_options.ip
|
|
port = global_settings.cmd_options.port
|
|
request.is_shell = global_settings.cmd_options.shell is not None
|
|
request.is_scheduler = global_settings.cmd_options.scheduler is not None
|
|
else:
|
|
ip, port = '127.0.0.1', '8000'
|
|
request.env.http_host = '%s:%s' % (ip, port)
|
|
request.env.remote_addr = '127.0.0.1'
|
|
request.env.web2py_runtime_gae = global_settings.web2py_runtime_gae
|
|
|
|
for k, v in extra_request.items():
|
|
setattr(request, k, v)
|
|
|
|
path_info = '/%s/%s/%s' % (a, c, f)
|
|
if request.args:
|
|
path_info = '%s/%s' % (path_info, '/'.join(request.args))
|
|
if request.vars:
|
|
vars = ['%s=%s' % (k, v) if v else '%s' % k
|
|
for (k, v) in iteritems(request.vars)]
|
|
path_info = '%s?%s' % (path_info, '&'.join(vars))
|
|
request.env.path_info = path_info
|
|
|
|
# Monkey patch so credentials checks pass.
|
|
|
|
def check_credentials(request, other_application='admin'):
|
|
return True
|
|
|
|
fileutils.check_credentials = check_credentials
|
|
|
|
environment = build_environment(request, response, session)
|
|
|
|
if import_models:
|
|
try:
|
|
run_models_in(environment)
|
|
except RestrictedError as e:
|
|
sys.stderr.write(e.traceback + '\n')
|
|
sys.exit(1)
|
|
|
|
response._view_environment = copy.copy(environment)
|
|
|
|
environment['__name__'] = '__main__'
|
|
return environment
|
|
|
|
|
|
def exec_pythonrc():
|
|
pythonrc = os.environ.get('PYTHONSTARTUP')
|
|
if pythonrc and os.path.isfile(pythonrc):
|
|
def execfile_getlocals(file):
|
|
execfile(file)
|
|
return locals()
|
|
try:
|
|
return execfile_getlocals(pythonrc)
|
|
except NameError:
|
|
pass
|
|
return dict()
|
|
|
|
|
|
def run(
|
|
appname,
|
|
plain=False,
|
|
import_models=False,
|
|
startfile=None,
|
|
bpython=False,
|
|
python_code=False,
|
|
cronjob=False):
|
|
"""
|
|
Start interactive shell or run Python script (startfile) in web2py
|
|
controller environment. appname is formatted like:
|
|
|
|
- a : web2py application name
|
|
- a/c : exec the controller c into the application environment
|
|
"""
|
|
|
|
(a, c, f, args, vars) = parse_path_info(appname, av=True)
|
|
errmsg = 'invalid application name: %s' % appname
|
|
if not a:
|
|
die(errmsg)
|
|
adir = os.path.join('applications', a)
|
|
|
|
if not os.path.exists(adir):
|
|
if sys.stdin and not sys.stdin.name == '/dev/null':
|
|
confirm = raw_input(
|
|
'application %s does not exist, create (y/n)?' % a)
|
|
else:
|
|
logging.warn('application does not exist and will not be created')
|
|
return
|
|
if confirm.lower() in ['y', 'yes']:
|
|
|
|
os.mkdir(adir)
|
|
w2p_unpack('welcome.w2p', adir)
|
|
for subfolder in ['models', 'views', 'controllers', 'databases',
|
|
'modules', 'cron', 'errors', 'sessions',
|
|
'languages', 'static', 'private', 'uploads']:
|
|
subpath = os.path.join(adir, subfolder)
|
|
if not os.path.exists(subpath):
|
|
os.mkdir(subpath)
|
|
db = os.path.join(adir, 'models/db.py')
|
|
if os.path.exists(db):
|
|
data = fileutils.read_file(db)
|
|
data = data.replace(
|
|
'<your secret key>', 'sha512:' + web2py_uuid())
|
|
fileutils.write_file(db, data)
|
|
|
|
if c:
|
|
import_models = True
|
|
extra_request = {}
|
|
if args:
|
|
extra_request['args'] = args
|
|
if vars:
|
|
# underscore necessary because request.vars is a property
|
|
extra_request['_vars'] = vars
|
|
_env = env(a, c=c, f=f, import_models=import_models, extra_request=extra_request)
|
|
if c:
|
|
pyfile = os.path.join('applications', a, 'controllers', c + '.py')
|
|
pycfile = os.path.join('applications', a, 'compiled',
|
|
"controllers_%s_%s.pyc" % (c, f))
|
|
if ((cronjob and os.path.isfile(pycfile))
|
|
or not os.path.isfile(pyfile)):
|
|
exec(read_pyc(pycfile), _env)
|
|
elif os.path.isfile(pyfile):
|
|
execfile(pyfile, _env)
|
|
else:
|
|
die(errmsg)
|
|
|
|
if f:
|
|
exec('print( %s())' % f, _env)
|
|
return
|
|
|
|
_env.update(exec_pythonrc())
|
|
if startfile:
|
|
try:
|
|
ccode = None
|
|
if startfile.endswith('.pyc'):
|
|
ccode = read_pyc(startfile)
|
|
exec(ccode, _env)
|
|
else:
|
|
execfile(startfile, _env)
|
|
|
|
if import_models:
|
|
BaseAdapter.close_all_instances('commit')
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
if import_models:
|
|
BaseAdapter.close_all_instances('rollback')
|
|
elif python_code:
|
|
try:
|
|
exec(python_code, _env)
|
|
if import_models:
|
|
BaseAdapter.close_all_instances('commit')
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
if import_models:
|
|
BaseAdapter.close_all_instances('rollback')
|
|
else:
|
|
if not plain:
|
|
if bpython:
|
|
try:
|
|
import bpython
|
|
bpython.embed(locals_=_env)
|
|
return
|
|
except:
|
|
logger.warning(
|
|
'import bpython error; trying ipython...')
|
|
else:
|
|
try:
|
|
import IPython
|
|
if IPython.__version__ > '1.0.0':
|
|
IPython.start_ipython(user_ns=_env)
|
|
return
|
|
elif IPython.__version__ == '1.0.0':
|
|
from IPython.terminal.embed import InteractiveShellEmbed
|
|
shell = InteractiveShellEmbed(user_ns=_env)
|
|
shell()
|
|
return
|
|
elif IPython.__version__ >= '0.11':
|
|
from IPython.frontend.terminal.embed import InteractiveShellEmbed
|
|
shell = InteractiveShellEmbed(user_ns=_env)
|
|
shell()
|
|
return
|
|
else:
|
|
# following 2 lines fix a problem with
|
|
# IPython; thanks Michael Toomim
|
|
if '__builtins__' in _env:
|
|
del _env['__builtins__']
|
|
shell = IPython.Shell.IPShell(argv=[], user_ns=_env)
|
|
shell.mainloop()
|
|
return
|
|
except:
|
|
logger.warning(
|
|
'import IPython error; use default python shell')
|
|
enable_autocomplete_and_history(adir, _env)
|
|
code.interact(local=_env)
|
|
|
|
|
|
def parse_path_info(path_info, av=False):
|
|
"""
|
|
Parses path info formatted like a/c/f where c and f are optional
|
|
and a leading `/` is accepted.
|
|
Return tuple (a, c, f). If invalid path_info a is set to None.
|
|
If c or f are omitted they are set to None.
|
|
If av=True, parse args and vars
|
|
"""
|
|
if av:
|
|
vars = None
|
|
if '?' in path_info:
|
|
path_info, query = path_info.split('?', 2)
|
|
vars = Storage()
|
|
for var in query.split('&'):
|
|
(var, val) = var.split('=', 2) if '=' in var else (var, None)
|
|
vars[var] = val
|
|
items = List(path_info.split('/'))
|
|
args = List(items[3:]) if len(items) > 3 else None
|
|
return (items(0), items(1), items(2), args, vars)
|
|
|
|
mo = re.match(r'^/?(?P<a>\w+)(/(?P<c>\w+)(/(?P<f>\w+))?)?$',
|
|
path_info)
|
|
if mo:
|
|
return (mo.group('a'), mo.group('c'), mo.group('f'))
|
|
else:
|
|
return (None, None, None)
|
|
|
|
|
|
def die(msg):
|
|
print(msg, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def test(testpath, import_models=True, verbose=False):
|
|
"""
|
|
Run doctests in web2py environment. testpath is formatted like:
|
|
|
|
- a: tests all controllers in application a
|
|
- a/c: tests controller c in application a
|
|
- a/c/f test function f in controller c, application a
|
|
|
|
Where a, c and f are application, controller and function names
|
|
respectively. If the testpath is a file name the file is tested.
|
|
If a controller is specified models are executed by default.
|
|
"""
|
|
|
|
import doctest
|
|
if os.path.isfile(testpath):
|
|
mo = re.match(r'(|.*/)applications/(?P<a>[^/]+)', testpath)
|
|
if not mo:
|
|
die('test file is not in application directory: %s'
|
|
% testpath)
|
|
a = mo.group('a')
|
|
c = f = None
|
|
files = [testpath]
|
|
else:
|
|
(a, c, f) = parse_path_info(testpath)
|
|
errmsg = 'invalid test path: %s' % testpath
|
|
if not a:
|
|
die(errmsg)
|
|
cdir = os.path.join('applications', a, 'controllers')
|
|
if not os.path.isdir(cdir):
|
|
die(errmsg)
|
|
if c:
|
|
cfile = os.path.join(cdir, c + '.py')
|
|
if not os.path.isfile(cfile):
|
|
die(errmsg)
|
|
files = [cfile]
|
|
else:
|
|
files = glob.glob(os.path.join(cdir, '*.py'))
|
|
for testfile in files:
|
|
globs = env(a, import_models)
|
|
ignores = globs.keys()
|
|
execfile(testfile, globs)
|
|
|
|
def doctest_object(name, obj):
|
|
"""doctest obj and enclosed methods and classes."""
|
|
|
|
if type(obj) in (types.FunctionType, type, ClassType, types.MethodType,
|
|
types.UnboundMethodType):
|
|
|
|
# Reload environment before each test.
|
|
|
|
globs = env(a, c=c, f=f, import_models=import_models)
|
|
execfile(testfile, globs)
|
|
doctest.run_docstring_examples(
|
|
obj, globs=globs,
|
|
name='%s: %s' % (os.path.basename(testfile),
|
|
name), verbose=verbose)
|
|
if type(obj) in (type, ClassType):
|
|
for attr_name in dir(obj):
|
|
|
|
# Execute . operator so decorators are executed.
|
|
|
|
o = eval('%s.%s' % (name, attr_name), globs)
|
|
doctest_object(attr_name, o)
|
|
|
|
for (name, obj) in globs.items():
|
|
if name not in ignores and (f is None or f == name):
|
|
doctest_object(name, obj)
|
|
|
|
|
|
def get_usage():
|
|
usage = """
|
|
%prog [options] pythonfile
|
|
"""
|
|
return usage
|
|
|
|
|
|
def execute_from_command_line(argv=None):
|
|
if argv is None:
|
|
argv = sys.argv
|
|
|
|
parser = optparse.OptionParser(usage=get_usage())
|
|
|
|
parser.add_option('-S', '--shell', dest='shell', metavar='APPNAME',
|
|
help='run web2py in interactive shell ' +
|
|
'or IPython(if installed) with specified appname')
|
|
msg = 'run web2py in interactive shell or bpython (if installed) with'
|
|
msg += ' specified appname (if app does not exist it will be created).'
|
|
msg += '\n Use combined with --shell'
|
|
parser.add_option(
|
|
'-B',
|
|
'--bpython',
|
|
action='store_true',
|
|
default=False,
|
|
dest='bpython',
|
|
help=msg,
|
|
)
|
|
parser.add_option(
|
|
'-P',
|
|
'--plain',
|
|
action='store_true',
|
|
default=False,
|
|
dest='plain',
|
|
help='only use plain python shell, should be used with --shell option',
|
|
)
|
|
parser.add_option(
|
|
'-M',
|
|
'--import_models',
|
|
action='store_true',
|
|
default=False,
|
|
dest='import_models',
|
|
help='auto import model files, default is False, ' +
|
|
' should be used with --shell option',
|
|
)
|
|
parser.add_option(
|
|
'-R',
|
|
'--run',
|
|
dest='run',
|
|
metavar='PYTHON_FILE',
|
|
default='',
|
|
help='run PYTHON_FILE in web2py environment, ' +
|
|
'should be used with --shell option',
|
|
)
|
|
|
|
(options, args) = parser.parse_args(argv[1:])
|
|
|
|
if len(sys.argv) == 1:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
if len(args) > 0:
|
|
startfile = args[0]
|
|
else:
|
|
startfile = ''
|
|
run(options.shell, options.plain, startfile=startfile,
|
|
bpython=options.bpython)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
execute_from_command_line()
|