Source code for scripttest

# (c) 2005-2007 Ian Bicking and contributors; written for Paste
# Licensed under the MIT license:
#       http://www.opensource.org/licenses/mit-license.php
"""
Helpers for testing command-line scripts
"""
import sys
import os
import shutil
import shlex
import subprocess
import re
import zlib


if sys.platform == 'win32':
    def clean_environ(e):
        ret = dict(
            ((str(k), str(v)) for k, v in e.items()))
        return ret
else:
    def clean_environ(e):
        return e


def string(string):
    if sys.version_info >= (3,):
        if isinstance(string, str):
            return string
        return str(string, "utf-8")
    else:
        if isinstance(string, unicode):  # noqa
            return string
        return string.decode('utf-8')


# From pathutils by Michael Foord:
#       http://www.voidspace.org.uk/python/pathutils.html
def onerror(func, path, exc_info):
    """
    Error handler for ``shutil.rmtree``.

    If the error is due to an access error (read only file)
    it attempts to add write permission and then retries.

    If the error is for another reason it re-raises the error.

    Usage : ``shutil.rmtree(path, onerror=onerror)``

    """
    import stat
    if not os.access(path, os.W_OK):
        # Is the error an access error ?
        os.chmod(path, stat.S_IWUSR)
        func(path)
    else:
        raise

__all__ = ['TestFileEnvironment']

if sys.platform == 'win32':
    def full_executable_path(invoked, environ):

        if os.path.splitext(invoked)[1]:
            return invoked

        explicit_dir = os.path.dirname(invoked)

        if explicit_dir:
            path = [explicit_dir]
        else:
            path = environ.get('PATH').split(os.path.pathsep)

        extensions = environ.get(
            'PATHEXT',
            # Use *something* in case the environment variable is
            # empty.  These come from my machine's defaults
            '.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.PSC1'
        ).split(os.path.pathsep)

        for dir in path:
            for ext in extensions:
                full_path = os.path.join(dir, invoked + ext)
                if os.path.exists(full_path):
                    return full_path

        return invoked  # Not found; invoking it will likely fail

    class Popen(subprocess.Popen):
        def __init__(
                self, args, bufsize=0, executable=None,
                stdin=None, stdout=None, stderr=None,
                preexec_fn=None, close_fds=False, shell=False,
                cwd=None, env=None,
                *args_, **kw):

            if executable is None and not shell:
                executable = full_executable_path(args[0], env or os.environ)

            super(Popen, self).__init__(
                args, bufsize, executable, stdin, stdout, stderr,
                preexec_fn, close_fds, shell, cwd, env, *args_, **kw)

else:
    from subprocess import Popen


[docs]class TestFileEnvironment(object): """ This represents an environment in which files will be written, and scripts will be run. """ # for py.test disabled = True
[docs] def __init__(self, base_path=None, template_path=None, environ=None, cwd=None, start_clear=True, ignore_paths=None, ignore_hidden=True, capture_temp=False, assert_no_temp=False, split_cmd=True): """ Creates an environment. ``base_path`` is used as the current working directory, and generally where changes are looked for. If not given, it will be the directory of the calling script plus ``test-output/``. ``template_path`` is the directory to look for *template* files, which are files you'll explicitly add to the environment. This is done with ``.writefile()``. ``environ`` is the operating system environment, ``os.environ`` if not given. ``cwd`` is the working directory, ``base_path`` by default. If ``start_clear`` is true (default) then the ``base_path`` will be cleared (all files deleted) when an instance is created. You can also use ``.clear()`` to clear the files. ``ignore_paths`` is a set of specific filenames that should be ignored when created in the environment. ``ignore_hidden`` means, if true (default) that filenames and directories starting with ``'.'`` will be ignored. ``capture_temp`` will put temporary files inside the environment (using ``$TMPDIR``). You can then assert that no temporary files are left using ``.assert_no_temp()``. """ if base_path is None: base_path = self._guess_base_path(1) self.base_path = base_path self.template_path = template_path if environ is None: environ = os.environ.copy() self.environ = environ if cwd is None: cwd = base_path self.cwd = cwd self.capture_temp = capture_temp if self.capture_temp: self.temp_path = os.path.join(self.base_path, 'tmp') self.environ['TMPDIR'] = self.temp_path else: self.temp_path = None if start_clear: self.clear() elif not os.path.exists(base_path): os.makedirs(base_path) self.ignore_paths = ignore_paths or [] self.ignore_hidden = ignore_hidden self.split_cmd = split_cmd if assert_no_temp and not self.capture_temp: raise TypeError( 'You cannot use assert_no_temp unless capture_temp=True') self._assert_no_temp = assert_no_temp self.split_cmd = split_cmd
def _guess_base_path(self, stack_level): frame = sys._getframe(stack_level + 1) file = frame.f_globals.get('__file__') if not file: raise TypeError( "Could not guess a base_path argument from the calling scope " "(no __file__ found)") dir = os.path.dirname(file) return os.path.join(dir, 'test-output')
[docs] def run(self, script, *args, **kw): """ Run the command, with the given arguments. The ``script`` argument can have space-separated arguments, or you can use the positional arguments. Keywords allowed are: ``expect_error``: (default False) Don't raise an exception in case of errors ``expect_stderr``: (default ``expect_error``) Don't raise an exception if anything is printed to stderr ``stdin``: (default ``""``) Input to the script ``cwd``: (default ``self.cwd``) The working directory to run in (default ``base_path``) ``quiet``: (default False) When there's an error (return code != 0), do not print stdout/stderr Returns a `ProcResult <class-paste.fixture.ProcResult.html>`_ object. """ __tracebackhide__ = True expect_error = kw.pop('expect_error', False) expect_stderr = kw.pop('expect_stderr', expect_error) cwd = kw.pop('cwd', self.cwd) stdin = kw.pop('stdin', None) quiet = kw.pop('quiet', False) debug = kw.pop('debug', False) if not self.temp_path: if 'expect_temp' in kw: raise TypeError( 'You cannot use expect_temp unless you use ' 'capture_temp=True') expect_temp = kw.pop('expect_temp', not self._assert_no_temp) args = list(map(str, args)) assert not kw, ( "Arguments not expected: %s" % ', '.join(kw.keys())) if self.split_cmd and ' ' in script: if args: # Then treat this as a script that has a space in it pass else: script, args = script.split(None, 1) args = shlex.split(args) all = [script] + args files_before = self._find_files() if debug: proc = subprocess.Popen(all, cwd=cwd, # see http://bugs.python.org/issue8557 shell=(sys.platform == 'win32'), env=clean_environ(self.environ)) else: proc = subprocess.Popen(all, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, cwd=cwd, # see http://bugs.python.org/issue8557 shell=(sys.platform == 'win32'), env=clean_environ(self.environ)) if debug: stdout, stderr = proc.communicate() else: stdout, stderr = proc.communicate(stdin) stdout = string(stdout) stderr = string(stderr) stdout = string(stdout).replace('\r\n', '\n') stderr = string(stderr).replace('\r\n', '\n') files_after = self._find_files() result = ProcResult( self, all, stdin, stdout, stderr, returncode=proc.returncode, files_before=files_before, files_after=files_after) if not expect_error: result.assert_no_error(quiet) if not expect_stderr: result.assert_no_stderr(quiet) if not expect_temp: result.assert_no_temp(quiet) return result
def _find_files(self): result = {} for fn in os.listdir(self.base_path): if self._ignore_file(fn): continue self._find_traverse(fn, result) return result def _ignore_file(self, fn): if fn in self.ignore_paths: return True if self.ignore_hidden and os.path.basename(fn).startswith('.'): return True return False def _find_traverse(self, path, result): full = os.path.join(self.base_path, path) if os.path.isdir(full): if not self.temp_path or path != 'tmp': result[path] = FoundDir(self.base_path, path) for fn in os.listdir(full): fn = os.path.join(path, fn) if self._ignore_file(fn): continue self._find_traverse(fn, result) else: result[path] = FoundFile(self.base_path, path)
[docs] def clear(self, force=False): """ Delete all the files in the base directory. """ marker_file = os.path.join(self.base_path, '.scripttest-test-dir.txt') if os.path.exists(self.base_path): if not force and not os.path.exists(marker_file): sys.stderr.write( 'The directory %s does not appear to have been created by ' 'ScriptTest\n' % self.base_path) sys.stderr.write( 'The directory %s must be a scratch directory; it will be ' 'wiped after every test run\n' % self.base_path) sys.stderr.write('Please delete this directory manually\n') raise AssertionError( "The directory %s was not created by ScriptTest; it must " "be deleted manually" % self.base_path) shutil.rmtree(self.base_path, onerror=onerror) os.mkdir(self.base_path) f = open(marker_file, 'w') f.write('placeholder') f.close() if self.temp_path and not os.path.exists(self.temp_path): os.makedirs(self.temp_path)
[docs] def writefile(self, path, content=None, frompath=None): """ Write a file to the given path. If ``content`` is given then that text is written, otherwise the file in ``frompath`` is used. ``frompath`` is relative to ``self.template_path`` """ full = os.path.join(self.base_path, path) if not os.path.exists(os.path.dirname(full)): os.makedirs(os.path.dirname(full)) f = open(full, 'wb') if content is not None: f.write(content) if frompath is not None: if self.template_path: frompath = os.path.join(self.template_path, frompath) f2 = open(frompath, 'rb') f.write(f2.read()) f2.close() f.close() return FoundFile(self.base_path, path)
[docs] def assert_no_temp(self): """If you use ``capture_temp`` then you can use this to make sure no files have been left in the temporary directory""" __tracebackhide__ = True if not self.temp_path: raise Exception('You cannot use assert_no_error unless you ' 'instantiate ' 'TestFileEnvironment(capture_temp=True)') names = os.listdir(self.temp_path) if not names: return new_names = [] for name in names: if os.path.isdir(os.path.join(self.temp_path, name)): name += '/' new_names.append(name) raise AssertionError( 'Temporary files left over: %s' % ', '.join(sorted(names)))
[docs]class ProcResult(object): """ Represents the results of running a command in `TestFileEnvironment <class-paste.fixture.TestFileEnvironment.html>`_. Attributes to pay particular attention to: ``stdout``, ``stderr``: What is produced on those streams. ``returncode``: The return code of the script. ``files_created``, ``files_deleted``, ``files_updated``: Dictionaries mapping filenames (relative to the ``base_path``) to `FoundFile <class-paste.fixture.FoundFile.html>`_ or `FoundDir <class-paste.fixture.FoundDir.html>`_ objects. """ def __init__(self, test_env, args, stdin, stdout, stderr, returncode, files_before, files_after): self.test_env = test_env self.args = args self.stdin = stdin self.stdout = stdout self.stderr = stderr self.returncode = returncode self.files_before = files_before self.files_after = files_after self.files_deleted = {} self.files_updated = {} self.files_created = files_after.copy() for path, f in files_before.items(): if path not in files_after: self.files_deleted[path] = f continue del self.files_created[path] if f != files_after[path]: self.files_updated[path] = files_after[path] if sys.platform == 'win32': self.stdout = self.stdout.replace('\n\r', '\n') self.stderr = self.stderr.replace('\n\r', '\n') def assert_no_error(self, quiet): __tracebackhide__ = True if self.returncode != 0: if not quiet: print(self) raise AssertionError( "Script returned code: %s" % self.returncode) def assert_no_stderr(self, quiet): __tracebackhide__ = True if self.stderr: if not quiet: print(self) else: print('Error output:') print(self.stderr) raise AssertionError("stderr output not expected") def assert_no_temp(self, quiet): __tracebackhide__ = True files = self.wildcard_matches('tmp/**') if files: if not quiet: print(self) else: print('Temp files:') print(', '.join(sorted( f.path for f in sorted(files, key=lambda x: x.path) ))) raise AssertionError("temp files not expected") def wildcard_matches(self, wildcard): """Return all the file objects whose path matches the given wildcard. You can use ``*`` to match any portion of a filename, and ``**`` to match multiple segments/directories. """ regex_parts = [] for index, part in enumerate(wildcard.split('**')): if index: regex_parts.append('.*') for internal_index, internal_part in enumerate(part.split('*')): if internal_index: regex_parts.append('[^/\\\\]*') regex_parts.append(re.escape(internal_part)) regex = ''.join(regex_parts) + '$' regex = re.compile(regex) results = [] for container in self.files_updated, self.files_created: for key, value in sorted(container.items()): if regex.match(key): results.append(value) return results def __str__(self): s = ['Script result: %s' % ' '.join(self.args)] if self.returncode: s.append(' return code: %s' % self.returncode) if self.stderr: s.append('-- stderr: --------------------') s.append(self.stderr) if self.stdout: s.append('-- stdout: --------------------') s.append(self.stdout) for name, files, show_size in [ ('created', self.files_created, True), ('deleted', self.files_deleted, True), ('updated', self.files_updated, True)]: if files: s.append('-- %s: -------------------' % name) files = list(files.items()) files.sort() last = '' for path, f in files: t = ' %s' % _space_prefix(last, path, indent=4, include_sep=False) last = path if f.invalid: t += ' (invalid link)' else: if show_size and f.size != 'N/A': t += ' (%s bytes)' % f.size s.append(t) return '\n'.join(s)
[docs]class FoundFile(object): """ Represents a single file found as the result of a command. Has attributes: ``path``: The path of the file, relative to the ``base_path`` ``full``: The full path ``bytes``: The contents of the file. ``stat``: The results of ``os.stat``. Also ``mtime`` and ``size`` contain the ``.st_mtime`` and ``.st_size`` of the stat. ``mtime``: The modification time of the file. ``size``: The size (in bytes) of the file. You may use the ``in`` operator with these objects (tested against the contents of the file), and the ``.mustcontain()`` method. """ file = True dir = False invalid = False def __init__(self, base_path, path): self.base_path = base_path self.path = path self.full = os.path.join(base_path, path) if os.path.exists(self.full): self.stat = os.stat(self.full) self.mtime = self.stat.st_mtime self.size = self.stat.st_size with open(self.full, "rb") as fp: self.hash = zlib.crc32(fp.read()) else: self.invalid = True self.stat = self.mtime = None self.size = 'N/A' self.hash = None self._bytes = None def bytes__get(self): if self._bytes is None: f = open(self.full, 'rb') self._bytes = string(f.read()) f.close() return self._bytes bytes = property(bytes__get) def __contains__(self, s): return s in self.bytes def mustcontain(self, s): __tracebackhide__ = True bytes = self.bytes if s not in bytes: print('Could not find %r in:' % s) print(bytes) assert s in bytes def __repr__(self): return '<%s %s:%s>' % ( self.__class__.__name__, self.base_path, self.path) def __eq__(self, other): if not isinstance(other, FoundFile): return NotImplemented return ( self.hash == other.hash and self.mtime == other.mtime and self.size == other.size ) def __ne__(self, other): return not self == other
[docs]class FoundDir(object): """ Represents a directory created by a command. """ file = False dir = True invalid = False def __init__(self, base_path, path): self.base_path = base_path self.path = path self.full = os.path.join(base_path, path) self.stat = os.stat(self.full) self.size = 'N/A' self.mtime = self.stat.st_mtime def __repr__(self): return '<%s %s:%s>' % ( self.__class__.__name__, self.base_path, self.path) def __eq__(self, other): if not isinstance(other, FoundDir): return NotImplemented return self.mtime == other.mtime def __ne__(self, other): return not self == other
def _space_prefix(pref, full, sep=None, indent=None, include_sep=True): """ Anything shared by pref and full will be replaced with spaces in full, and full returned. """ if sep is None: sep = os.path.sep pref = pref.split(sep) full = full.split(sep) padding = [] while pref and full and pref[0] == full[0]: if indent is None: padding.append(' ' * (len(full[0]) + len(sep))) else: padding.append(' ' * indent) full.pop(0) pref.pop(0) if padding: if include_sep: return ''.join(padding) + sep + sep.join(full) else: return ''.join(padding) + sep.join(full) else: return sep.join(full)