#!/usr/bin/env python # -*- coding: utf-8 -*- """ | This file is part of the web2py Web Framework | Copyrighted by Massimo Di Pierro | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) File operations --------------- """ from gluon import storage import os import re import tarfile import glob import time import datetime import logging from gluon.http import HTTP from gzip import open as gzopen from gluon.recfile import generate from gluon._compat import PY2 __all__ = [ 'parse_version', 'read_file', 'write_file', 'readlines_file', 'up', 'abspath', 'mktree', 'listdir', 'recursive_unlink', 'cleanpath', 'tar', 'untar', 'tar_compiled', 'get_session', 'check_credentials', 'w2p_pack', 'w2p_unpack', 'w2p_pack_plugin', 'w2p_unpack_plugin', 'fix_newlines', 'make_fake_file_like_object', ] def parse_semantic(version="Version 1.99.0-rc.1+timestamp.2011.09.19.08.23.26"): """Parses a version string according to http://semver.org/ rules Args: version(str): the SemVer string Returns: tuple: Major, Minor, Patch, Release, Build Date """ re_version = re.compile('(\d+)\.(\d+)\.(\d+)(\-(?P
[^\s+]*))?(\+(?P\S*))')
    m = re_version.match(version.strip().split()[-1])
    if not m:
        return None
    a, b, c = int(m.group(1)), int(m.group(2)), int(m.group(3))
    pre_release = m.group('pre') or ''
    build = m.group('build') or ''
    if build.startswith('timestamp'):
        build = datetime.datetime.strptime(build.split('.', 1)[1], '%Y.%m.%d.%H.%M.%S')
    return (a, b, c, pre_release, build)


def parse_legacy(version="Version 1.99.0 (2011-09-19 08:23:26)"):
    """Parses "legacy" version string

    Args:
        version(str): the version string

    Returns:
        tuple: Major, Minor, Patch, Release, Build Date

    """
    re_version = re.compile('[^\d]+ (\d+)\.(\d+)\.(\d+)\s*\((?P.+?)\)\s*(?P[a-z]+)?')
    m = re_version.match(version)
    a, b, c = int(m.group(1)), int(m.group(2)), int(m.group(3)),
    pre_release = m.group('type') or 'dev'
    build = datetime.datetime.strptime(m.group('datetime'), '%Y-%m-%d %H:%M:%S')
    return (a, b, c, pre_release, build)


def parse_version(version):
    """Attempts to parse SemVer, fallbacks on legacy
    """
    version_tuple = parse_semantic(version)
    if not version_tuple:
        version_tuple = parse_legacy(version)
    return version_tuple


def open_file(filename, mode):
    if PY2 or 'b' in mode:
        f = open(filename, mode)
    else:
        f = open(filename, mode, encoding="utf8")
    return f


def read_file(filename, mode='r'):
    """Returns content from filename, making sure to close the file explicitly
    on exit.
    """
    f = open_file(filename, mode)
    try:
        return f.read()
    finally:
        f.close()


def write_file(filename, value, mode='w'):
    """Writes  to filename, making sure to close the file
    explicitly on exit.
    """
    f = open_file(filename, mode)
    try:
        return f.write(value)
    finally:
        f.close()


def readlines_file(filename, mode='r'):
    """Applies .split('\n') to the output of `read_file()`
    """
    return read_file(filename, mode).split('\n')


def mktree(path):
    head, tail = os.path.split(path)
    if head:
        if tail:
            mktree(head)
        if not os.path.exists(head):
            os.mkdir(head)


def listdir(path,
            expression='^.+$',
            drop=True,
            add_dirs=False,
            sort=True,
            maxnum=None,
            exclude_content_from=None
            ):
    """
    Like `os.listdir()` but you can specify a regex pattern to filter files.
    If `add_dirs` is True, the returned items will have the full path.
    """
    if exclude_content_from is None:
        exclude_content_from = []
    if path[-1:] != os.path.sep:
        path = path + os.path.sep
    if drop:
        n = len(path)
    else:
        n = 0
    regex = re.compile(expression)
    items = []
    for (root, dirs, files) in os.walk(path, topdown=True):
        for dir in dirs[:]:
            if dir.startswith('.'):
                dirs.remove(dir)
        if add_dirs:
            items.append(root[n:])
        for file in sorted(files):
            if regex.match(file) and not file.startswith('.'):
                if root not in exclude_content_from:
                    items.append(os.path.join(root, file)[n:])
            if maxnum and len(items) >= maxnum:
                break
    if sort:
        return sorted(items)
    else:
        return items


def recursive_unlink(f):
    """Deletes `f`. If it's a folder, also its contents will be deleted
    """
    if os.path.isdir(f):
        for s in os.listdir(f):
            recursive_unlink(os.path.join(f, s))
        os.rmdir(f)
    elif os.path.isfile(f):
        os.unlink(f)


def cleanpath(path):
    """Turns any expression/path into a valid filename. replaces / with _ and
    removes special characters.
    """

    items = path.split('.')
    if len(items) > 1:
        path = re.sub('[^\w\.]+', '_', '_'.join(items[:-1]) + '.'
                      + ''.join(items[-1:]))
    else:
        path = re.sub('[^\w\.]+', '_', ''.join(items[-1:]))
    return path


def _extractall(filename, path='.', members=None):
    tar = tarfile.TarFile(filename, 'r')
    ret = tar.extractall(path, members)
    tar.close()
    return ret


def tar(file, dir, expression='^.+$',
        filenames=None, exclude_content_from=None):
    """Tars dir into file, only tars file that match expression
    """

    tar = tarfile.TarFile(file, 'w')
    try:
        if filenames is None:
            filenames = listdir(dir, expression, add_dirs=True,
                exclude_content_from=exclude_content_from)
        for file in filenames:
            tar.add(os.path.join(dir, file), file, False)
    finally:
        tar.close()


def untar(file, dir):
    """Untar file into dir
    """

    _extractall(file, dir)


def w2p_pack(filename, path, compiled=False, filenames=None):
    """Packs a web2py application.

    Args:
        filename(str): path to the resulting archive
        path(str): path to the application
        compiled(bool): if `True` packs the compiled version
        filenames(list): adds filenames to the archive
    """
    filename = abspath(filename)
    path = abspath(path)
    tarname = filename + '.tar'
    if compiled:
        tar_compiled(tarname, path, '^[\w\.\-]+$',
                     exclude_content_from=['cache', 'sessions', 'errors'])
    else:
        tar(tarname, path, '^[\w\.\-]+$', filenames=filenames,
            exclude_content_from=['cache', 'sessions', 'errors'])
    w2pfp = gzopen(filename, 'wb')
    tarfp = open(tarname, 'rb')
    w2pfp.write(tarfp.read())
    w2pfp.close()
    tarfp.close()
    os.unlink(tarname)


def create_welcome_w2p():
    is_newinstall_file = os.path.exists('NEWINSTALL')
    if not os.path.exists('welcome.w2p') or is_newinstall_file:
        try:
            w2p_pack('welcome.w2p', 'applications/welcome')
            logging.info("New installation: created welcome.w2p file")
        except:
            logging.error("New installation error: unable to create welcome.w2p file")
            return
        if is_newinstall_file:
            try:
                os.unlink('NEWINSTALL')
                logging.info("New installation: removed NEWINSTALL file")
            except:
                logging.error("New installation error: unable to remove NEWINSTALL file")


def w2p_unpack(filename, path, delete_tar=True):

    if filename == 'welcome.w2p':
        create_welcome_w2p()
    filename = abspath(filename)
    path = abspath(path)
    if filename[-4:] == '.w2p' or filename[-3:] == '.gz':
        if filename[-4:] == '.w2p':
            tarname = filename[:-4] + '.tar'
        else:
            tarname = filename[:-3] + '.tar'
        fgzipped = gzopen(filename, 'rb')
        tarfile = open(tarname, 'wb')
        tarfile.write(fgzipped.read())
        tarfile.close()
        fgzipped.close()
    else:
        tarname = filename
    untar(tarname, path)
    if delete_tar:
        os.unlink(tarname)


def w2p_pack_plugin(filename, path, plugin_name):
    """Packs the given plugin into a w2p file.
    Will match files at::

        /*/plugin_[name].*
        /*/plugin_[name]/*

    """
    filename = abspath(filename)
    path = abspath(path)
    if not filename.endswith('web2py.plugin.%s.w2p' % plugin_name):
        raise Exception("Not a web2py plugin name")
    plugin_tarball = tarfile.open(filename, 'w:gz')
    try:
        app_dir = path
        while app_dir[-1] == '/':
            app_dir = app_dir[:-1]
        files1 = glob.glob(
            os.path.join(app_dir, '*/plugin_%s.*' % plugin_name))
        files2 = glob.glob(
            os.path.join(app_dir, '*/plugin_%s/*' % plugin_name))
        for file in files1 + files2:
            plugin_tarball.add(file, arcname=file[len(app_dir) + 1:])
    finally:
        plugin_tarball.close()


def w2p_unpack_plugin(filename, path, delete_tar=True):
    filename = abspath(filename)
    path = abspath(path)
    if not os.path.basename(filename).startswith('web2py.plugin.'):
        raise Exception("Not a web2py plugin")
    w2p_unpack(filename, path, delete_tar)


def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()


def up(path):
    return os.path.dirname(os.path.normpath(path))


def get_session(request, other_application='admin'):
    """Checks that user is authorized to access other_application"""
    if request.application == other_application:
        raise KeyError
    try:
        session_id = request.cookies['session_id_' + other_application].value
        session_filename = os.path.join(
            up(request.folder), other_application, 'sessions', session_id)
        if not os.path.exists(session_filename):
            session_filename = generate(session_filename)
        osession = storage.load_storage(session_filename)
    except Exception as e:
        osession = storage.Storage()
    return osession


def set_session(request, session, other_application='admin'):
    """Checks that user is authorized to access other_application"""
    if request.application == other_application:
        raise KeyError
    session_id = request.cookies['session_id_' + other_application].value
    session_filename = os.path.join(
        up(request.folder), other_application, 'sessions', session_id)
    storage.save_storage(session, session_filename)


def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = 'Sign in with your google account.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '%s' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r


def fix_newlines(path):
    regex = re.compile(r'''(\r
|\r|
)''')
    for filename in listdir(path, '.*\.(py|html)$', drop=False):
        rdata = read_file(filename, 'r')
        wdata = regex.sub('\n', rdata)
        if wdata != rdata:
            write_file(filename, wdata, 'w')


def copystream(
    src,
    dest,
    size,
    chunk_size=10 ** 5,
):
    """
    this is here because I think there is a bug in shutil.copyfileobj
    """
    while size > 0:
        if size < chunk_size:
            data = src.read(size)
        else:
            data = src.read(chunk_size)
        length = len(data)
        if length > size:
            (data, length) = (data[:size], size)
        size -= length
        if length == 0:
            break
        dest.write(data)
        if length < chunk_size:
            break
    dest.seek(0)
    return


def make_fake_file_like_object():
    class LogFile(object):
        def write(self, value):
            pass

        def close(self):
            pass
    return LogFile()


from gluon.settings import global_settings  # we need to import settings here because
                                      # settings imports fileutils too


def abspath(*relpath, **base):
    """Converts relative path to absolute path based (by default) on
    applications_parent
    """
    path = os.path.join(*relpath)
    gluon = base.get('gluon', False)
    if os.path.isabs(path):
        return path
    if gluon:
        return os.path.join(global_settings.gluon_parent, path)
    return os.path.join(global_settings.applications_parent, path)