Source code for pfio.v2.pathlib

import fnmatch
import os
from pathlib import PurePath

from .fs import FS
from .local import Local


[docs] class Path: sep = '/' def __init__(self, *args, fs=None, root=None): ''' Empty argument indicates current directory in fs, fs.cwd ''' self._fs = fs if fs else Local() assert isinstance(self._fs, FS) sep = self.sep if root: if not root.startswith(self.sep): raise ValueError("root must start with a separator.") if root == self.sep: args = [self.sep] + args else: args = [self.sep, root[len(self.sep):]] + args # Construct parts parts = [] for argv in args: p = argv if p.startswith(sep): parts = [sep] p = p[len(sep):] if len(p) > 0: parts += [q for q in p.split(sep) if q not in ("", ".")] # Check assert "" not in parts if parts and parts[0] == self.sep: assert all(self.sep not in p for p in parts[1:]) else: assert all(self.sep not in p for p in parts) self._parts = parts @property def name(self): if not self._parts: return "" if len(self._parts) == 1 and self._parts[0] == self.sep: return "" return os.path.normpath(self._parts[-1]) @property def suffix(self): return os.path.splitext(self.resolve())[1] def with_suffix(self, ext): assert ext.startswith('.') name = self.name if name == "": raise ValueError(f"{self} has an empty name") base, _ = os.path.splitext(name) return self.parent / f"{base}{ext}" @property def parent(self): if not self._parts: return self if self._parts[0] == self.sep and len(self._parts) == 1: return self return Path(*(self._parts[:-1]), fs=self._fs) @property def parts(self): return tuple(self._parts) @property def root(self): if self._parts and self._parts[0] == self.sep: return self.sep return "" def __rtruediv__(self, a): if isinstance(a, str): lhs = Path(a, fs=self._fs) else: lhs = a return lhs / self def __truediv__(self, a): parts = self._parts[:] if isinstance(a, Path): parts += a._parts elif isinstance(a, PurePath): raise RuntimeError("mixed") else: parts.append(str(a)) p = Path(*parts, fs=self._fs) return p @classmethod def cwd(cls): raise NotImplementedError("cwd() is unsupported on this system") @classmethod def home(cls): raise NotImplementedError("home() is unsupported on this system") def exists(self): return self._fs.exists(str(self.resolve())) def is_absolute(self): return self._parts and self._parts[0] == self.sep def __repr__(self): return "{}[{}:{}]".format(self.__class__.__name__, self._fs.__class__.__name__, str(self)) def __str__(self): if not self._parts: return "." if self._parts[0] == self.sep: return self.sep + self.sep.join(self._parts[1:]) return self.sep.join(self._parts) def __fspath__(self): return str(self) def __lt__(self, other): return str(self) < str(other) def resolve(self, strict=True): if '..' in self._parts or '.' in self._parts: raise RuntimeError("TODO") return Path(*(self._fs.cwd, *self._parts), fs=self._fs) def samefile(self, other): # TODO: Compare self._fs return str(self.resolve()) == str(other.resolve()) def touch(self): with self._fs.open(self.resolve(), 'wb') as fp: fp.write(b'') def open(self, mode='r', **kwargs): # TODO: handle or warn on ignoring these keyword arguments # buffering=-1, encoding=None, errors=None, newline=None): return self._fs.open(self.resolve(), mode) def mkdir(self, mode=0o777, parents=False, exist_ok=False): if parents: return self._fs.makedirs(self.resolve(), mode, exist_ok) try: self._fs.mkdir(self.resolve(), mode) except FileExistsError as e: if not exist_ok: raise e else: pass def is_dir(self): return self._fs.isdir(self.resolve()) def is_file(self): return not self._fs.isdir(self.resolve()) def iterdir(self): return self.glob("*")
[docs] def glob(self, pattern): '''glob It may be slow depending on the filesystem type. ''' # Try specialized implementation with self._as_fs() as fs: try: generator = fs.glob(pattern) except NotImplementedError: pass else: return (self / f for f in generator) # Fall back to slower generic implementation return self._glob_generic(pattern)
def _glob_generic(self, pattern): '''Slow and generic implementation of glob.''' base = self.resolve() base_parts = base._parts pattern_parts = (base / pattern)._parts visited_prefixes = set() for p in self._fs.list(base, recursive=True): parent = p while (i := parent.rfind(self.sep)) != -1: parent = parent[:i] if parent in visited_prefixes: continue visited_prefixes.add(parent) target_parts = (base / parent)._parts if _test_glob_by_parts(target_parts, pattern_parts): yield self / parent target_parts = base_parts + p.split("/") if _test_glob_by_parts(target_parts, pattern_parts): yield self / p def stat(self): return self._fs.stat(self.resolve()) def unlink(self, missing_ok=False): return self._fs.remove(self.resolve()) def rename(self, target): raise NotImplementedError("rename() is unsupported on this system") def read_text(self): with self.open() as fp: return fp.read() def read_bytes(self): with self.open(mode='rb') as fp: return fp.read() def write_text(self, data): with self.open(mode='w') as fp: return fp.write(data) def write_bytes(self, data): view = memoryview(data) with self.open(mode='wb') as fp: return fp.write(view) def _as_fs(self): return self._fs._newfs(str(self.resolve()))
def _test_glob_by_parts(target_parts, pattern_parts): target_parts = target_parts[:] pattern_parts = pattern_parts[:] while True: if len(target_parts) == 0 and len(pattern_parts) == 0: return True if len(target_parts) == 0 or len(pattern_parts) == 0: return False p = pattern_parts.pop(0) if p == "**": for i in range(0, len(target_parts)): if _test_glob_by_parts(target_parts[i:], pattern_parts): return True return False t = target_parts.pop(0) if not fnmatch.fnmatch(t, p): return False