From 19dda7c9bdc8ef71c792e0f77a9595bfad8d9248 Mon Sep 17 00:00:00 2001 From: Anthony Sottile Date: Wed, 19 Oct 2022 10:10:49 -0400 Subject: [PATCH] vendor py.path and py.error --- .pre-commit-config.yaml | 1 - setup.cfg | 3 +- src/_pytest/_py/__init__.py | 0 src/_pytest/_py/error.py | 91 +++ src/_pytest/_py/path.py | 1489 +++++++++++++++++++++++++++++++++++ src/py.py | 10 + 6 files changed, 1592 insertions(+), 2 deletions(-) create mode 100644 src/_pytest/_py/__init__.py create mode 100644 src/_pytest/_py/error.py create mode 100644 src/_pytest/_py/path.py create mode 100644 src/py.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cc1b1de718..de612d96988 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -65,7 +65,6 @@ repos: args: [] additional_dependencies: - iniconfig>=1.1.0 - - py>=1.8.2 - attrs>=19.2.0 - packaging - tomli diff --git a/setup.cfg b/setup.cfg index 38f50556c93..39ade4dff4c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,16 +36,17 @@ packages = _pytest _pytest._code _pytest._io + _pytest._py _pytest.assertion _pytest.config _pytest.mark pytest +py_modules = py install_requires = attrs>=19.2.0 iniconfig packaging pluggy>=0.12,<2.0 - py>=1.8.2 colorama;sys_platform=="win32" exceptiongroup>=1.0.0rc8;python_version<"3.11" importlib-metadata>=0.12;python_version<"3.8" diff --git a/src/_pytest/_py/__init__.py b/src/_pytest/_py/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/_pytest/_py/error.py b/src/_pytest/_py/error.py new file mode 100644 index 00000000000..e1e3ccd28af --- /dev/null +++ b/src/_pytest/_py/error.py @@ -0,0 +1,91 @@ +""" +create errno-specific classes for IO or os calls. + +""" +from types import ModuleType +import sys, os, errno + +class Error(EnvironmentError): + def __repr__(self): + return "%s.%s %r: %s " %(self.__class__.__module__, + self.__class__.__name__, + self.__class__.__doc__, + " ".join(map(str, self.args)), + #repr(self.args) + ) + + def __str__(self): + s = "[%s]: %s" %(self.__class__.__doc__, + " ".join(map(str, self.args)), + ) + return s + +_winerrnomap = { + 2: errno.ENOENT, + 3: errno.ENOENT, + 17: errno.EEXIST, + 18: errno.EXDEV, + 13: errno.EBUSY, # empty cd drive, but ENOMEDIUM seems unavailiable + 22: errno.ENOTDIR, + 20: errno.ENOTDIR, + 267: errno.ENOTDIR, + 5: errno.EACCES, # anything better? +} + +class ErrorMaker(ModuleType): + """ lazily provides Exception classes for each possible POSIX errno + (as defined per the 'errno' module). All such instances + subclass EnvironmentError. + """ + Error = Error + _errno2class = {} + + def __getattr__(self, name): + if name[0] == "_": + raise AttributeError(name) + eno = getattr(errno, name) + cls = self._geterrnoclass(eno) + setattr(self, name, cls) + return cls + + def _geterrnoclass(self, eno): + try: + return self._errno2class[eno] + except KeyError: + clsname = errno.errorcode.get(eno, "UnknownErrno%d" %(eno,)) + errorcls = type(Error)(clsname, (Error,), + {'__module__':'py.error', + '__doc__': os.strerror(eno)}) + self._errno2class[eno] = errorcls + return errorcls + + def checked_call(self, func, *args, **kwargs): + """ call a function and raise an errno-exception if applicable. """ + __tracebackhide__ = True + try: + return func(*args, **kwargs) + except self.Error: + raise + except (OSError, EnvironmentError): + cls, value, tb = sys.exc_info() + if not hasattr(value, 'errno'): + raise + __tracebackhide__ = False + errno = value.errno + try: + if not isinstance(value, WindowsError): + raise NameError + except NameError: + # we are not on Windows, or we got a proper OSError + cls = self._geterrnoclass(errno) + else: + try: + cls = self._geterrnoclass(_winerrnomap[errno]) + except KeyError: + raise value + raise cls("%s%r" % (func.__name__, args)) + __tracebackhide__ = True + + +error = ErrorMaker('_pytest._py.error') +sys.modules[error.__name__] = error diff --git a/src/_pytest/_py/path.py b/src/_pytest/_py/path.py new file mode 100644 index 00000000000..e8adbf27a79 --- /dev/null +++ b/src/_pytest/_py/path.py @@ -0,0 +1,1489 @@ +""" +local path implementation. +""" +from __future__ import with_statement + +from contextlib import contextmanager +import sys, os, atexit, io, uuid +from stat import S_ISLNK, S_ISDIR, S_ISREG + +from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname + +import warnings +import os +import sys +import posixpath +import fnmatch + +from . import error + +# Moved from local.py. +iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt') + +try: + # FileNotFoundError might happen in py34, and is not available with py27. + import_errors = (ImportError, FileNotFoundError) +except NameError: + import_errors = (ImportError,) + +try: + from os import fspath +except ImportError: + def fspath(path): + """ + Return the string representation of the path. + If str or bytes is passed in, it is returned unchanged. + This code comes from PEP 519, modified to support earlier versions of + python. + + This is required for python < 3.6. + """ + if isinstance(path, (str, bytes)): + return path + + # Work from the object's type to match method resolution of other magic + # methods. + path_type = type(path) + try: + return path_type.__fspath__(path) + except AttributeError: + if hasattr(path_type, '__fspath__'): + raise + try: + import pathlib + except import_errors: + pass + else: + if isinstance(path, pathlib.PurePath): + return str(path) + + raise TypeError("expected str, bytes or os.PathLike object, not " + + path_type.__name__) + +class Checkers: + _depend_on_existence = 'exists', 'link', 'dir', 'file' + + def __init__(self, path): + self.path = path + + def dir(self): + raise NotImplementedError + + def file(self): + raise NotImplementedError + + def dotfile(self): + return self.path.basename.startswith('.') + + def ext(self, arg): + if not arg.startswith('.'): + arg = '.' + arg + return self.path.ext == arg + + def exists(self): + raise NotImplementedError + + def basename(self, arg): + return self.path.basename == arg + + def basestarts(self, arg): + return self.path.basename.startswith(arg) + + def relto(self, arg): + return self.path.relto(arg) + + def fnmatch(self, arg): + return self.path.fnmatch(arg) + + def endswith(self, arg): + return str(self.path).endswith(arg) + + def _evaluate(self, kw): + for name, value in kw.items(): + invert = False + meth = None + try: + meth = getattr(self, name) + except AttributeError: + if name[:3] == 'not': + invert = True + try: + meth = getattr(self, name[3:]) + except AttributeError: + pass + if meth is None: + raise TypeError( + "no %r checker available for %r" % (name, self.path)) + try: + if py.code.getrawcode(meth).co_argcount > 1: + if (not meth(value)) ^ invert: + return False + else: + if bool(value) ^ bool(meth()) ^ invert: + return False + except (error.ENOENT, error.ENOTDIR, error.EBUSY): + # EBUSY feels not entirely correct, + # but its kind of necessary since ENOMEDIUM + # is not accessible in python + for name in self._depend_on_existence: + if name in kw: + if kw.get(name): + return False + name = 'not' + name + if name in kw: + if not kw.get(name): + return False + return True + +class NeverRaised(Exception): + pass + +class PathBase(object): + """ shared implementation for filesystem path objects.""" + Checkers = Checkers + + def __div__(self, other): + return self.join(fspath(other)) + __truediv__ = __div__ # py3k + + def basename(self): + """ basename part of path. """ + return self._getbyspec('basename')[0] + basename = property(basename, None, None, basename.__doc__) + + def dirname(self): + """ dirname part of path. """ + return self._getbyspec('dirname')[0] + dirname = property(dirname, None, None, dirname.__doc__) + + def purebasename(self): + """ pure base name of the path.""" + return self._getbyspec('purebasename')[0] + purebasename = property(purebasename, None, None, purebasename.__doc__) + + def ext(self): + """ extension of the path (including the '.').""" + return self._getbyspec('ext')[0] + ext = property(ext, None, None, ext.__doc__) + + def dirpath(self, *args, **kwargs): + """ return the directory path joined with any given path arguments. """ + return self.new(basename='').join(*args, **kwargs) + + def read_binary(self): + """ read and return a bytestring from reading the path. """ + with self.open('rb') as f: + return f.read() + + def read_text(self, encoding): + """ read and return a Unicode string from reading the path. """ + with self.open("r", encoding=encoding) as f: + return f.read() + + + def read(self, mode='r'): + """ read and return a bytestring from reading the path. """ + with self.open(mode) as f: + return f.read() + + def readlines(self, cr=1): + """ read and return a list of lines from the path. if cr is False, the +newline will be removed from the end of each line. """ + if sys.version_info < (3, ): + mode = 'rU' + else: # python 3 deprecates mode "U" in favor of "newline" option + mode = 'r' + + if not cr: + content = self.read(mode) + return content.split('\n') + else: + f = self.open(mode) + try: + return f.readlines() + finally: + f.close() + + def load(self): + """ (deprecated) return object unpickled from self.read() """ + f = self.open('rb') + try: + import pickle + return error.checked_call(pickle.load, f) + finally: + f.close() + + def move(self, target): + """ move this path to target. """ + if target.relto(self): + raise error.EINVAL( + target, + "cannot move path into a subdirectory of itself") + try: + self.rename(target) + except error.EXDEV: # invalid cross-device link + self.copy(target) + self.remove() + + def __repr__(self): + """ return a string representation of this path. """ + return repr(str(self)) + + def check(self, **kw): + """ check a path for existence and properties. + + Without arguments, return True if the path exists, otherwise False. + + valid checkers:: + + file=1 # is a file + file=0 # is not a file (may not even exist) + dir=1 # is a dir + link=1 # is a link + exists=1 # exists + + You can specify multiple checker definitions, for example:: + + path.check(file=1, link=1) # a link pointing to a file + """ + if not kw: + kw = {'exists': 1} + return self.Checkers(self)._evaluate(kw) + + def fnmatch(self, pattern): + """return true if the basename/fullname matches the glob-'pattern'. + + valid pattern characters:: + + * matches everything + ? matches any single character + [seq] matches any character in seq + [!seq] matches any char not in seq + + If the pattern contains a path-separator then the full path + is used for pattern matching and a '*' is prepended to the + pattern. + + if the pattern doesn't contain a path-separator the pattern + is only matched against the basename. + """ + return FNMatcher(pattern)(self) + + def relto(self, relpath): + """ return a string which is the relative part of the path + to the given 'relpath'. + """ + if not isinstance(relpath, (str, PathBase)): + raise TypeError("%r: not a string or path object" %(relpath,)) + strrelpath = str(relpath) + if strrelpath and strrelpath[-1] != self.sep: + strrelpath += self.sep + #assert strrelpath[-1] == self.sep + #assert strrelpath[-2] != self.sep + strself = self.strpath + if sys.platform == "win32" or getattr(os, '_name', None) == 'nt': + if os.path.normcase(strself).startswith( + os.path.normcase(strrelpath)): + return strself[len(strrelpath):] + elif strself.startswith(strrelpath): + return strself[len(strrelpath):] + return "" + + def ensure_dir(self, *args): + """ ensure the path joined with args is a directory. """ + return self.ensure(*args, **{"dir": True}) + + def bestrelpath(self, dest): + """ return a string which is a relative path from self + (assumed to be a directory) to dest such that + self.join(bestrelpath) == dest and if not such + path can be determined return dest. + """ + try: + if self == dest: + return os.curdir + base = self.common(dest) + if not base: # can be the case on windows + return str(dest) + self2base = self.relto(base) + reldest = dest.relto(base) + if self2base: + n = self2base.count(self.sep) + 1 + else: + n = 0 + l = [os.pardir] * n + if reldest: + l.append(reldest) + target = dest.sep.join(l) + return target + except AttributeError: + return str(dest) + + def exists(self): + return self.check() + + def isdir(self): + return self.check(dir=1) + + def isfile(self): + return self.check(file=1) + + def parts(self, reverse=False): + """ return a root-first list of all ancestor directories + plus the path itself. + """ + current = self + l = [self] + while 1: + last = current + current = current.dirpath() + if last == current: + break + l.append(current) + if not reverse: + l.reverse() + return l + + def common(self, other): + """ return the common part shared with the other path + or None if there is no common part. + """ + last = None + for x, y in zip(self.parts(), other.parts()): + if x != y: + return last + last = x + return last + + def __add__(self, other): + """ return new path object with 'other' added to the basename""" + return self.new(basename=self.basename+str(other)) + + def __cmp__(self, other): + """ return sort value (-1, 0, +1). """ + try: + return cmp(self.strpath, other.strpath) + except AttributeError: + return cmp(str(self), str(other)) # self.path, other.path) + + def __lt__(self, other): + try: + return self.strpath < other.strpath + except AttributeError: + return str(self) < str(other) + + def visit(self, fil=None, rec=None, ignore=NeverRaised, bf=False, sort=False): + """ yields all paths below the current one + + fil is a filter (glob pattern or callable), if not matching the + path will not be yielded, defaulting to None (everything is + returned) + + rec is a filter (glob pattern or callable) that controls whether + a node is descended, defaulting to None + + ignore is an Exception class that is ignoredwhen calling dirlist() + on any of the paths (by default, all exceptions are reported) + + bf if True will cause a breadthfirst search instead of the + default depthfirst. Default: False + + sort if True will sort entries within each directory level. + """ + for x in Visitor(fil, rec, ignore, bf, sort).gen(self): + yield x + + def _sortlist(self, res, sort): + if sort: + if hasattr(sort, '__call__'): + warnings.warn(DeprecationWarning( + "listdir(sort=callable) is deprecated and breaks on python3" + ), stacklevel=3) + res.sort(sort) + else: + res.sort() + + def samefile(self, other): + """ return True if other refers to the same stat object as self. """ + return self.strpath == str(other) + + def __fspath__(self): + return self.strpath + +class Visitor: + def __init__(self, fil, rec, ignore, bf, sort): + if isinstance(fil, py.builtin._basestring): + fil = FNMatcher(fil) + if isinstance(rec, py.builtin._basestring): + self.rec = FNMatcher(rec) + elif not hasattr(rec, '__call__') and rec: + self.rec = lambda path: True + else: + self.rec = rec + self.fil = fil + self.ignore = ignore + self.breadthfirst = bf + self.optsort = sort and sorted or (lambda x: x) + + def gen(self, path): + try: + entries = path.listdir() + except self.ignore: + return + rec = self.rec + dirs = self.optsort([p for p in entries + if p.check(dir=1) and (rec is None or rec(p))]) + if not self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + for p in self.optsort(entries): + if self.fil is None or self.fil(p): + yield p + if self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + +class FNMatcher: + def __init__(self, pattern): + self.pattern = pattern + + def __call__(self, path): + pattern = self.pattern + + if (pattern.find(path.sep) == -1 and + iswin32 and + pattern.find(posixpath.sep) != -1): + # Running on Windows, the pattern has no Windows path separators, + # and the pattern has one or more Posix path separators. Replace + # the Posix path separators with the Windows path separator. + pattern = pattern.replace(posixpath.sep, path.sep) + + if pattern.find(path.sep) == -1: + name = path.basename + else: + name = str(path) # path.strpath # XXX svn? + if not os.path.isabs(pattern): + pattern = '*' + path.sep + pattern + return fnmatch.fnmatch(name, pattern) + + +if sys.version_info > (3,0): + def map_as_list(func, iter): + return list(map(func, iter)) +else: + map_as_list = map + +ALLOW_IMPORTLIB_MODE = sys.version_info > (3,5) +if ALLOW_IMPORTLIB_MODE: + import importlib + + +class Stat(object): + def __getattr__(self, name): + return getattr(self._osstatresult, "st_" + name) + + def __init__(self, path, osstatresult): + self.path = path + self._osstatresult = osstatresult + + @property + def owner(self): + if iswin32: + raise NotImplementedError("XXX win32") + import pwd + entry = error.checked_call(pwd.getpwuid, self.uid) + return entry[0] + + @property + def group(self): + """ return group name of file. """ + if iswin32: + raise NotImplementedError("XXX win32") + import grp + entry = error.checked_call(grp.getgrgid, self.gid) + return entry[0] + + def isdir(self): + return S_ISDIR(self._osstatresult.st_mode) + + def isfile(self): + return S_ISREG(self._osstatresult.st_mode) + + def islink(self): + st = self.path.lstat() + return S_ISLNK(self._osstatresult.st_mode) + +class PosixPath(PathBase): + def chown(self, user, group, rec=0): + """ change ownership to the given user and group. + user and group may be specified by a number or + by a name. if rec is True change ownership + recursively. + """ + uid = getuserid(user) + gid = getgroupid(group) + if rec: + for x in self.visit(rec=lambda x: x.check(link=0)): + if x.check(link=0): + error.checked_call(os.chown, str(x), uid, gid) + error.checked_call(os.chown, str(self), uid, gid) + + def readlink(self): + """ return value of a symbolic link. """ + return error.checked_call(os.readlink, self.strpath) + + def mklinkto(self, oldname): + """ posix style hard link to another name. """ + error.checked_call(os.link, str(oldname), str(self)) + + def mksymlinkto(self, value, absolute=1): + """ create a symbolic link with the given value (pointing to another name). """ + if absolute: + error.checked_call(os.symlink, str(value), self.strpath) + else: + base = self.common(value) + # with posix local paths '/' is always a common base + relsource = self.__class__(value).relto(base) + reldest = self.relto(base) + n = reldest.count(self.sep) + target = self.sep.join(('..', )*n + (relsource, )) + error.checked_call(os.symlink, target, self.strpath) + +def getuserid(user): + import pwd + if not isinstance(user, int): + user = pwd.getpwnam(user)[2] + return user + +def getgroupid(group): + import grp + if not isinstance(group, int): + group = grp.getgrnam(group)[2] + return group + +FSBase = not iswin32 and PosixPath or PathBase + +class LocalPath(FSBase): + """ object oriented interface to os.path and other local filesystem + related information. + """ + class ImportMismatchError(ImportError): + """ raised on pyimport() if there is a mismatch of __file__'s""" + + sep = os.sep + class Checkers(Checkers): + def _stat(self): + try: + return self._statcache + except AttributeError: + try: + self._statcache = self.path.stat() + except error.ELOOP: + self._statcache = self.path.lstat() + return self._statcache + + def dir(self): + return S_ISDIR(self._stat().mode) + + def file(self): + return S_ISREG(self._stat().mode) + + def exists(self): + return self._stat() + + def link(self): + st = self.path.lstat() + return S_ISLNK(st.mode) + + def __init__(self, path=None, expanduser=False): + """ Initialize and return a local Path instance. + + Path can be relative to the current directory. + If path is None it defaults to the current working directory. + If expanduser is True, tilde-expansion is performed. + Note that Path instances always carry an absolute path. + Note also that passing in a local path object will simply return + the exact same path object. Use new() to get a new copy. + """ + if path is None: + self.strpath = error.checked_call(os.getcwd) + else: + try: + path = fspath(path) + except TypeError: + raise ValueError("can only pass None, Path instances " + "or non-empty strings to LocalPath") + if expanduser: + path = os.path.expanduser(path) + self.strpath = abspath(path) + + def __hash__(self): + s = self.strpath + if iswin32: + s = s.lower() + return hash(s) + + def __eq__(self, other): + s1 = fspath(self) + try: + s2 = fspath(other) + except TypeError: + return False + if iswin32: + s1 = s1.lower() + try: + s2 = s2.lower() + except AttributeError: + return False + return s1 == s2 + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + return fspath(self) < fspath(other) + + def __gt__(self, other): + return fspath(self) > fspath(other) + + def samefile(self, other): + """ return True if 'other' references the same file as 'self'. + """ + other = fspath(other) + if not isabs(other): + other = abspath(other) + if self == other: + return True + if not hasattr(os.path, "samefile"): + return False + return error.checked_call( + os.path.samefile, self.strpath, other) + + def remove(self, rec=1, ignore_errors=False): + """ remove a file or directory (or a directory tree if rec=1). + if ignore_errors is True, errors while removing directories will + be ignored. + """ + if self.check(dir=1, link=0): + if rec: + # force remove of readonly files on windows + if iswin32: + self.chmod(0o700, rec=1) + import shutil + error.checked_call( + shutil.rmtree, self.strpath, + ignore_errors=ignore_errors) + else: + error.checked_call(os.rmdir, self.strpath) + else: + if iswin32: + self.chmod(0o700) + error.checked_call(os.remove, self.strpath) + + def computehash(self, hashtype="md5", chunksize=524288): + """ return hexdigest of hashvalue for this file. """ + try: + try: + import hashlib as mod + except ImportError: + if hashtype == "sha1": + hashtype = "sha" + mod = __import__(hashtype) + hash = getattr(mod, hashtype)() + except (AttributeError, ImportError): + raise ValueError("Don't know how to compute %r hash" %(hashtype,)) + f = self.open('rb') + try: + while 1: + buf = f.read(chunksize) + if not buf: + return hash.hexdigest() + hash.update(buf) + finally: + f.close() + + def new(self, **kw): + """ create a modified version of this path. + the following keyword arguments modify various path parts:: + + a:/some/path/to/a/file.ext + xx drive + xxxxxxxxxxxxxxxxx dirname + xxxxxxxx basename + xxxx purebasename + xxx ext + """ + obj = object.__new__(self.__class__) + if not kw: + obj.strpath = self.strpath + return obj + drive, dirname, basename, purebasename,ext = self._getbyspec( + "drive,dirname,basename,purebasename,ext") + if 'basename' in kw: + if 'purebasename' in kw or 'ext' in kw: + raise ValueError("invalid specification %r" % kw) + else: + pb = kw.setdefault('purebasename', purebasename) + try: + ext = kw['ext'] + except KeyError: + pass + else: + if ext and not ext.startswith('.'): + ext = '.' + ext + kw['basename'] = pb + ext + + if ('dirname' in kw and not kw['dirname']): + kw['dirname'] = drive + else: + kw.setdefault('dirname', dirname) + kw.setdefault('sep', self.sep) + obj.strpath = normpath( + "%(dirname)s%(sep)s%(basename)s" % kw) + return obj + + def _getbyspec(self, spec): + """ see new for what 'spec' can be. """ + res = [] + parts = self.strpath.split(self.sep) + + args = filter(None, spec.split(',') ) + append = res.append + for name in args: + if name == 'drive': + append(parts[0]) + elif name == 'dirname': + append(self.sep.join(parts[:-1])) + else: + basename = parts[-1] + if name == 'basename': + append(basename) + else: + i = basename.rfind('.') + if i == -1: + purebasename, ext = basename, '' + else: + purebasename, ext = basename[:i], basename[i:] + if name == 'purebasename': + append(purebasename) + elif name == 'ext': + append(ext) + else: + raise ValueError("invalid part specification %r" % name) + return res + + def dirpath(self, *args, **kwargs): + """ return the directory path joined with any given path arguments. """ + if not kwargs: + path = object.__new__(self.__class__) + path.strpath = dirname(self.strpath) + if args: + path = path.join(*args) + return path + return super(LocalPath, self).dirpath(*args, **kwargs) + + def join(self, *args, **kwargs): + """ return a new path by appending all 'args' as path + components. if abs=1 is used restart from root if any + of the args is an absolute path. + """ + sep = self.sep + strargs = [fspath(arg) for arg in args] + strpath = self.strpath + if kwargs.get('abs'): + newargs = [] + for arg in reversed(strargs): + if isabs(arg): + strpath = arg + strargs = newargs + break + newargs.insert(0, arg) + # special case for when we have e.g. strpath == "/" + actual_sep = "" if strpath.endswith(sep) else sep + for arg in strargs: + arg = arg.strip(sep) + if iswin32: + # allow unix style paths even on windows. + arg = arg.strip('/') + arg = arg.replace('/', sep) + strpath = strpath + actual_sep + arg + actual_sep = sep + obj = object.__new__(self.__class__) + obj.strpath = normpath(strpath) + return obj + + def open(self, mode='r', ensure=False, encoding=None): + """ return an opened file with the given mode. + + If ensure is True, create parent directories if needed. + """ + if ensure: + self.dirpath().ensure(dir=1) + if encoding: + return error.checked_call(io.open, self.strpath, mode, encoding=encoding) + return error.checked_call(open, self.strpath, mode) + + def _fastjoin(self, name): + child = object.__new__(self.__class__) + child.strpath = self.strpath + self.sep + name + return child + + def islink(self): + return islink(self.strpath) + + def check(self, **kw): + if not kw: + return exists(self.strpath) + if len(kw) == 1: + if "dir" in kw: + return not kw["dir"] ^ isdir(self.strpath) + if "file" in kw: + return not kw["file"] ^ isfile(self.strpath) + return super(LocalPath, self).check(**kw) + + _patternchars = set("*?[" + os.path.sep) + def listdir(self, fil=None, sort=None): + """ list directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if fil is None and sort is None: + names = error.checked_call(os.listdir, self.strpath) + return map_as_list(self._fastjoin, names) + if isinstance(fil, py.builtin._basestring): + if not self._patternchars.intersection(fil): + child = self._fastjoin(fil) + if exists(child.strpath): + return [child] + return [] + fil = FNMatcher(fil) + names = error.checked_call(os.listdir, self.strpath) + res = [] + for name in names: + child = self._fastjoin(name) + if fil is None or fil(child): + res.append(child) + self._sortlist(res, sort) + return res + + def size(self): + """ return size of the underlying file object """ + return self.stat().size + + def mtime(self): + """ return last modification time of the path. """ + return self.stat().mtime + + def copy(self, target, mode=False, stat=False): + """ copy path to target. + + If mode is True, will copy copy permission from path to target. + If stat is True, copy permission, last modification + time, last access time, and flags from path to target. + """ + if self.check(file=1): + if target.check(dir=1): + target = target.join(self.basename) + assert self!=target + copychunked(self, target) + if mode: + copymode(self.strpath, target.strpath) + if stat: + copystat(self, target) + else: + def rec(p): + return p.check(link=0) + for x in self.visit(rec=rec): + relpath = x.relto(self) + newx = target.join(relpath) + newx.dirpath().ensure(dir=1) + if x.check(link=1): + newx.mksymlinkto(x.readlink()) + continue + elif x.check(file=1): + copychunked(x, newx) + elif x.check(dir=1): + newx.ensure(dir=1) + if mode: + copymode(x.strpath, newx.strpath) + if stat: + copystat(x, newx) + + def rename(self, target): + """ rename this path to target. """ + target = fspath(target) + return error.checked_call(os.rename, self.strpath, target) + + def dump(self, obj, bin=1): + """ pickle object into path location""" + f = self.open('wb') + import pickle + try: + error.checked_call(pickle.dump, obj, f, bin) + finally: + f.close() + + def mkdir(self, *args): + """ create & return the directory joined with args. """ + p = self.join(*args) + error.checked_call(os.mkdir, fspath(p)) + return p + + def write_binary(self, data, ensure=False): + """ write binary data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open('wb') as f: + f.write(data) + + def write_text(self, data, encoding, ensure=False): + """ write text data into path using the specified encoding. + If ensure is True create missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open('w', encoding=encoding) as f: + f.write(data) + + def write(self, data, mode='w', ensure=False): + """ write data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + if 'b' in mode: + if not py.builtin._isbytes(data): + raise ValueError("can only process bytes") + else: + if not py.builtin._istext(data): + if not py.builtin._isbytes(data): + data = str(data) + else: + data = py.builtin._totext(data, sys.getdefaultencoding()) + f = self.open(mode) + try: + f.write(data) + finally: + f.close() + + def _ensuredirs(self): + parent = self.dirpath() + if parent == self: + return self + if parent.check(dir=0): + parent._ensuredirs() + if self.check(dir=0): + try: + self.mkdir() + except error.EEXIST: + # race condition: file/dir created by another thread/process. + # complain if it is not a dir + if self.check(dir=0): + raise + return self + + def ensure(self, *args, **kwargs): + """ ensure that an args-joined path exists (by default as + a file). if you specify a keyword argument 'dir=True' + then the path is forced to be a directory path. + """ + p = self.join(*args) + if kwargs.get('dir', 0): + return p._ensuredirs() + else: + p.dirpath()._ensuredirs() + if not p.check(file=1): + p.open('w').close() + return p + + def stat(self, raising=True): + """ Return an os.stat() tuple. """ + if raising == True: + return Stat(self, error.checked_call(os.stat, self.strpath)) + try: + return Stat(self, os.stat(self.strpath)) + except KeyboardInterrupt: + raise + except Exception: + return None + + def lstat(self): + """ Return an os.lstat() tuple. """ + return Stat(self, error.checked_call(os.lstat, self.strpath)) + + def setmtime(self, mtime=None): + """ set modification time for the given path. if 'mtime' is None + (the default) then the file's mtime is set to current time. + + Note that the resolution for 'mtime' is platform dependent. + """ + if mtime is None: + return error.checked_call(os.utime, self.strpath, mtime) + try: + return error.checked_call(os.utime, self.strpath, (-1, mtime)) + except error.EINVAL: + return error.checked_call(os.utime, self.strpath, (self.atime(), mtime)) + + def chdir(self): + """ change directory to self and return old current directory """ + try: + old = self.__class__() + except error.ENOENT: + old = None + error.checked_call(os.chdir, self.strpath) + return old + + + @contextmanager + def as_cwd(self): + """ + Return a context manager, which changes to the path's dir during the + managed "with" context. + On __enter__ it returns the old dir, which might be ``None``. + """ + old = self.chdir() + try: + yield old + finally: + if old is not None: + old.chdir() + + def realpath(self): + """ return a new path which contains no symbolic links.""" + return self.__class__(os.path.realpath(self.strpath)) + + def atime(self): + """ return last access time of the path. """ + return self.stat().atime + + def __repr__(self): + return 'local(%r)' % self.strpath + + def __str__(self): + """ return string representation of the Path. """ + return self.strpath + + def chmod(self, mode, rec=0): + """ change permissions to the given mode. If mode is an + integer it directly encodes the os-specific modes. + if rec is True perform recursively. + """ + if not isinstance(mode, int): + raise TypeError("mode %r must be an integer" % (mode,)) + if rec: + for x in self.visit(rec=rec): + error.checked_call(os.chmod, str(x), mode) + error.checked_call(os.chmod, self.strpath, mode) + + def pypkgpath(self): + """ return the Python package path by looking for the last + directory upwards which still contains an __init__.py. + Return None if a pkgpath can not be determined. + """ + pkgpath = None + for parent in self.parts(reverse=True): + if parent.isdir(): + if not parent.join('__init__.py').exists(): + break + if not isimportable(parent.basename): + break + pkgpath = parent + return pkgpath + + def _ensuresyspath(self, ensuremode, path): + if ensuremode: + s = str(path) + if ensuremode == "append": + if s not in sys.path: + sys.path.append(s) + else: + if s != sys.path[0]: + sys.path.insert(0, s) + + def pyimport(self, modname=None, ensuresyspath=True): + """ return path as an imported python module. + + If modname is None, look for the containing package + and construct an according module name. + The module will be put/looked up in sys.modules. + if ensuresyspath is True then the root dir for importing + the file (taking __init__.py files into account) will + be prepended to sys.path if it isn't there already. + If ensuresyspath=="append" the root dir will be appended + if it isn't already contained in sys.path. + if ensuresyspath is False no modification of syspath happens. + + Special value of ensuresyspath=="importlib" is intended + purely for using in pytest, it is capable only of importing + separate .py files outside packages, e.g. for test suite + without any __init__.py file. It effectively allows having + same-named test modules in different places and offers + mild opt-in via this option. Note that it works only in + recent versions of python. + """ + if not self.check(): + raise error.ENOENT(self) + + if ensuresyspath == 'importlib': + if modname is None: + modname = self.purebasename + if not ALLOW_IMPORTLIB_MODE: + raise ImportError( + "Can't use importlib due to old version of Python") + spec = importlib.util.spec_from_file_location( + modname, str(self)) + if spec is None: + raise ImportError( + "Can't find module %s at location %s" % + (modname, str(self)) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + pkgpath = None + if modname is None: + pkgpath = self.pypkgpath() + if pkgpath is not None: + pkgroot = pkgpath.dirpath() + names = self.new(ext="").relto(pkgroot).split(self.sep) + if names[-1] == "__init__": + names.pop() + modname = ".".join(names) + else: + pkgroot = self.dirpath() + modname = self.purebasename + + self._ensuresyspath(ensuresyspath, pkgroot) + __import__(modname) + mod = sys.modules[modname] + if self.basename == "__init__.py": + return mod # we don't check anything as we might + # be in a namespace package ... too icky to check + modfile = mod.__file__ + if modfile[-4:] in ('.pyc', '.pyo'): + modfile = modfile[:-1] + elif modfile.endswith('$py.class'): + modfile = modfile[:-9] + '.py' + if modfile.endswith(os.path.sep + "__init__.py"): + if self.basename != "__init__.py": + modfile = modfile[:-12] + try: + issame = self.samefile(modfile) + except error.ENOENT: + issame = False + if not issame: + ignore = os.getenv('PY_IGNORE_IMPORTMISMATCH') + if ignore != '1': + raise self.ImportMismatchError(modname, modfile, self) + return mod + else: + try: + return sys.modules[modname] + except KeyError: + # we have a custom modname, do a pseudo-import + import types + mod = types.ModuleType(modname) + mod.__file__ = str(self) + sys.modules[modname] = mod + try: + py.builtin.execfile(str(self), mod.__dict__) + except: + del sys.modules[modname] + raise + return mod + + def sysexec(self, *argv, **popen_opts): + """ return stdout text from executing a system child process, + where the 'self' path points to executable. + The process is directly invoked and not through a system shell. + """ + from subprocess import Popen, PIPE + argv = map_as_list(str, argv) + popen_opts['stdout'] = popen_opts['stderr'] = PIPE + proc = Popen([str(self)] + argv, **popen_opts) + stdout, stderr = proc.communicate() + ret = proc.wait() + if py.builtin._isbytes(stdout): + stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) + if ret != 0: + if py.builtin._isbytes(stderr): + stderr = py.builtin._totext(stderr, sys.getdefaultencoding()) + raise py.process.cmdexec.Error(ret, ret, str(self), + stdout, stderr,) + return stdout + + def sysfind(cls, name, checker=None, paths=None): + """ return a path object found by looking at the systems + underlying PATH specification. If the checker is not None + it will be invoked to filter matching paths. If a binary + cannot be found, None is returned + Note: This is probably not working on plain win32 systems + but may work on cygwin. + """ + if isabs(name): + p = py.path.local(name) + if p.check(file=1): + return p + else: + if paths is None: + if iswin32: + paths = os.environ['Path'].split(';') + if '' not in paths and '.' not in paths: + paths.append('.') + try: + systemroot = os.environ['SYSTEMROOT'] + except KeyError: + pass + else: + paths = [path.replace('%SystemRoot%', systemroot) + for path in paths] + else: + paths = os.environ['PATH'].split(':') + tryadd = [] + if iswin32: + tryadd += os.environ['PATHEXT'].split(os.pathsep) + tryadd.append("") + + for x in paths: + for addext in tryadd: + p = py.path.local(x).join(name, abs=True) + addext + try: + if p.check(file=1): + if checker: + if not checker(p): + continue + return p + except error.EACCES: + pass + return None + sysfind = classmethod(sysfind) + + def _gethomedir(cls): + try: + x = os.environ['HOME'] + except KeyError: + try: + x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH'] + except KeyError: + return None + return cls(x) + _gethomedir = classmethod(_gethomedir) + + # """ + # special class constructors for local filesystem paths + # """ + @classmethod + def get_temproot(cls): + """ return the system's temporary directory + (where tempfiles are usually created in) + """ + import tempfile + return py.path.local(tempfile.gettempdir()) + + @classmethod + def mkdtemp(cls, rootdir=None): + """ return a Path object pointing to a fresh new temporary directory + (which we created ourself). + """ + import tempfile + if rootdir is None: + rootdir = cls.get_temproot() + return cls(error.checked_call(tempfile.mkdtemp, dir=str(rootdir))) + + def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, + lock_timeout=172800): # two days + """ return unique directory with a number greater than the current + maximum one. The number is assumed to start directly after prefix. + if keep is true directories with a number less than (maxnum-keep) + will be removed. If .lock files are used (lock_timeout non-zero), + algorithm is multi-process safe. + """ + if rootdir is None: + rootdir = cls.get_temproot() + + nprefix = prefix.lower() + def parse_num(path): + """ parse the number out of a path (if it matches the prefix) """ + nbasename = path.basename.lower() + if nbasename.startswith(nprefix): + try: + return int(nbasename[len(nprefix):]) + except ValueError: + pass + + def create_lockfile(path): + """ exclusively create lockfile. Throws when failed """ + mypid = os.getpid() + lockfile = path.join('.lock') + if hasattr(lockfile, 'mksymlinkto'): + lockfile.mksymlinkto(str(mypid)) + else: + fd = error.checked_call(os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) + with os.fdopen(fd, 'w') as f: + f.write(str(mypid)) + return lockfile + + def atexit_remove_lockfile(lockfile): + """ ensure lockfile is removed at process exit """ + mypid = os.getpid() + def try_remove_lockfile(): + # in a fork() situation, only the last process should + # remove the .lock, otherwise the other processes run the + # risk of seeing their temporary dir disappear. For now + # we remove the .lock in the parent only (i.e. we assume + # that the children finish before the parent). + if os.getpid() != mypid: + return + try: + lockfile.remove() + except error.Error: + pass + atexit.register(try_remove_lockfile) + + # compute the maximum number currently in use with the prefix + lastmax = None + while True: + maxnum = -1 + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None: + maxnum = max(maxnum, num) + + # make the new directory + try: + udir = rootdir.mkdir(prefix + str(maxnum+1)) + if lock_timeout: + lockfile = create_lockfile(udir) + atexit_remove_lockfile(lockfile) + except (error.EEXIST, error.ENOENT, error.EBUSY): + # race condition (1): another thread/process created the dir + # in the meantime - try again + # race condition (2): another thread/process spuriously acquired + # lock treating empty directory as candidate + # for removal - try again + # race condition (3): another thread/process tried to create the lock at + # the same time (happened in Python 3.3 on Windows) + # https://ci.appveyor.com/project/pytestbot/py/build/1.0.21/job/ffi85j4c0lqwsfwa + if lastmax == maxnum: + raise + lastmax = maxnum + continue + break + + def get_mtime(path): + """ read file modification time """ + try: + return path.lstat().mtime + except error.Error: + pass + + garbage_prefix = prefix + 'garbage-' + + def is_garbage(path): + """ check if path denotes directory scheduled for removal """ + bn = path.basename + return bn.startswith(garbage_prefix) + + # prune old directories + udir_time = get_mtime(udir) + if keep and udir_time: + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None and num <= (maxnum - keep): + try: + # try acquiring lock to remove directory as exclusive user + if lock_timeout: + create_lockfile(path) + except (error.EEXIST, error.ENOENT, error.EBUSY): + path_time = get_mtime(path) + if not path_time: + # assume directory doesn't exist now + continue + if abs(udir_time - path_time) < lock_timeout: + # assume directory with lockfile exists + # and lock timeout hasn't expired yet + continue + + # path dir locked for exclusive use + # and scheduled for removal to avoid another thread/process + # treating it as a new directory or removal candidate + garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4())) + try: + path.rename(garbage_path) + garbage_path.remove(rec=1) + except KeyboardInterrupt: + raise + except: # this might be error.Error, WindowsError ... + pass + if is_garbage(path): + try: + path.remove(rec=1) + except KeyboardInterrupt: + raise + except: # this might be error.Error, WindowsError ... + pass + + # make link... + try: + username = os.environ['USER'] #linux, et al + except KeyError: + try: + username = os.environ['USERNAME'] #windows + except KeyError: + username = 'current' + + src = str(udir) + dest = src[:src.rfind('-')] + '-' + username + try: + os.unlink(dest) + except OSError: + pass + try: + os.symlink(src, dest) + except (OSError, AttributeError, NotImplementedError): + pass + + return udir + make_numbered_dir = classmethod(make_numbered_dir) + + +def copymode(src, dest): + """ copy permission from src to dst. """ + import shutil + shutil.copymode(src, dest) + + +def copystat(src, dest): + """ copy permission, last modification time, + last access time, and flags from src to dst.""" + import shutil + shutil.copystat(str(src), str(dest)) + + +def copychunked(src, dest): + chunksize = 524288 # half a meg of bytes + fsrc = src.open('rb') + try: + fdest = dest.open('wb') + try: + while 1: + buf = fsrc.read(chunksize) + if not buf: + break + fdest.write(buf) + finally: + fdest.close() + finally: + fsrc.close() + + +def isimportable(name): + if name and (name[0].isalpha() or name[0] == '_'): + name = name.replace("_", '') + return not name or name.isalnum() + +local = LocalPath diff --git a/src/py.py b/src/py.py new file mode 100644 index 00000000000..c4f049e360f --- /dev/null +++ b/src/py.py @@ -0,0 +1,10 @@ +# shim for pylib going away +# if pylib is installed this file will get skipped +# (`py/__init__.py` has higher precedence) +import sys + +import _pytest._py.error as error +import _pytest._py.path as path + +sys.modules['py.error'] = error +sys.modules['py.path'] = path