diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/common/system/filesystem_mock.py')
-rw-r--r-- | Tools/Scripts/webkitpy/common/system/filesystem_mock.py | 191 |
1 files changed, 170 insertions, 21 deletions
diff --git a/Tools/Scripts/webkitpy/common/system/filesystem_mock.py b/Tools/Scripts/webkitpy/common/system/filesystem_mock.py index 809c4c6..0004944 100644 --- a/Tools/Scripts/webkitpy/common/system/filesystem_mock.py +++ b/Tools/Scripts/webkitpy/common/system/filesystem_mock.py @@ -43,10 +43,82 @@ class MockFileSystem(object): not exist. """ self.files = files or {} + self.written_files = {} + self.sep = '/' + self.current_tmpno = 0 + + def _raise_not_found(self, path): + raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT)) + + def _split(self, path): + idx = path.rfind('/') + return (path[0:idx], path[idx + 1:]) + + def abspath(self, path): + return path + + def basename(self, path): + return self._split(path)[1] + + def copyfile(self, source, destination): + if not self.exists(source): + self._raise_not_found(source) + if self.isdir(source): + raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR)) + if self.isdir(destination): + raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR)) + + self.files[destination] = self.files[source] + + def dirname(self, path): + return self._split(path)[0] def exists(self, path): return self.isfile(path) or self.isdir(path) + def files_under(self, path, dirs_to_skip=[], file_filter=None): + def filter_all(fs, dirpath, basename): + return True + + file_filter = file_filter or filter_all + files = [] + if self.isfile(path): + if file_filter(self, self.dirname(path), self.basename(path)): + files.append(path) + return files + + if self.basename(path) in dirs_to_skip: + return [] + + if not path.endswith('/'): + path += '/' + + dir_substrings = ['/' + d + '/' for d in dirs_to_skip] + for filename in self.files: + if not filename.startswith(path): + continue + + suffix = filename[len(path) - 1:] + if any(dir_substring in suffix for dir_substring in dir_substrings): + continue + + dirpath, basename = self._split(filename) + if file_filter(self, dirpath, basename): + files.append(filename) + + return files + + def glob(self, path): + # FIXME: This only handles a wildcard '*' at the end of the path. + # Maybe it should handle more? + if path[-1] == '*': + return [f for f in self.files if f.startswith(path[:-1])] + else: + return [f for f in self.files if f == path] + + def isabs(self, path): + return path.startswith('/') + def isfile(self, path): return path in self.files and self.files[path] is not None @@ -55,7 +127,12 @@ class MockFileSystem(object): return False if not path.endswith('/'): path += '/' - return any(f.startswith(path) for f in self.files) + + # We need to use a copy of the keys here in order to avoid switching + # to a different thread and potentially modifying the dict in + # mid-iteration. + files = self.files.keys()[:] + return any(f.startswith(path) for f in files) def join(self, *comps): return re.sub(re.escape(os.path.sep), '/', os.path.join(*comps)) @@ -80,42 +157,114 @@ class MockFileSystem(object): files.append(remaining) return dirs + files + def mtime(self, path): + if self.exists(path): + return 0 + self._raise_not_found(path) + + def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs): + if dir is None: + dir = '/__im_tmp' + curno = self.current_tmpno + self.current_tmpno += 1 + return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix)) + + def mkdtemp(self, **kwargs): + class TemporaryDirectory(object): + def __init__(self, fs, **kwargs): + self._kwargs = kwargs + self._filesystem = fs + self._directory_path = fs._mktemp(**kwargs) + fs.maybe_make_directory(self._directory_path) + + def __str__(self): + return self._directory_path + + def __enter__(self): + return self._directory_path + + def __exit__(self, type, value, traceback): + # Only self-delete if necessary. + + # FIXME: Should we delete non-empty directories? + if self._filesystem.exists(self._directory_path): + self._filesystem.rmtree(self._directory_path) + + return TemporaryDirectory(fs=self, **kwargs) + def maybe_make_directory(self, *path): # FIXME: Implement such that subsequent calls to isdir() work? pass + def move(self, src, dst): + if self.files[src] is None: + self._raise_not_found(src) + self.files[dst] = self.files[src] + self.files[src] = None + + def normpath(self, path): + return path + + def open_binary_tempfile(self, suffix): + path = self._mktemp(suffix) + return WritableFileObject(self, path), path + + def open_text_file_for_writing(self, path, append=False): + return WritableFileObject(self, path, append) + def read_text_file(self, path): return self.read_binary_file(path) def read_binary_file(self, path): - if path in self.files: - if self.files[path] is None: - raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT)) - return self.files[path] + # Intentionally raises KeyError if we don't recognize the path. + if self.files[path] is None: + self._raise_not_found(path) + return self.files[path] + + def remove(self, path): + if self.files[path] is None: + self._raise_not_found(path) + self.files[path] = None + + def rmtree(self, path): + if not path.endswith('/'): + path += '/' + + for f in self.files: + if f.startswith(path): + self.files[f] = None + + def splitext(self, path): + idx = path.rfind('.') + if idx == -1: + idx = 0 + return (path[0:idx], path[idx:]) def write_text_file(self, path, contents): return self.write_binary_file(path, contents) def write_binary_file(self, path, contents): self.files[path] = contents + self.written_files[path] = contents - def copyfile(self, source, destination): - if not self.exists(source): - raise IOError(errno.ENOENT, source, os.strerror(errno.ENOENT)) - if self.isdir(source): - raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR)) - if self.isdir(destination): - raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR)) - self.files[destination] = self.files[source] +class WritableFileObject(object): + def __init__(self, fs, path, append=False, encoding=None): + self.fs = fs + self.path = path + self.closed = False + if path not in self.fs.files or not append: + self.fs.files[path] = "" - def files_under(self, path): - if not path.endswith('/'): - path += '/' - return [file for file in self.files if file.startswith(path)] + def __enter__(self): + return self - def remove(self, path): - del self.files[path] + def __exit__(self, type, value, traceback): + self.close() + + def close(self): + self.closed = True - def remove_tree(self, path, ignore_errors=False): - self.files = [file for file in self.files if not file.startswith(path)] + def write(self, str): + self.fs.files[self.path] += str + self.fs.written_files[self.path] = self.fs.files[self.path] |