Source code for pfio.v2.local

import io
import os
import pathlib
import shutil
from typing import Optional

from pfio._profiler import record, record_iterable

from .fs import FS, FileStat, format_repr


class LocalProfileIOWrapper:
    def __init__(self, fp):
        self.fp = fp

    def __enter__(self):
        self.fp.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        with record("pfio.v2.Local:exit-context", trace=True):
            self.fp.__exit__(exc_type, exc_value, traceback)

    def __getattr__(self, name):
        attr = getattr(self.fp, name)
        if callable(attr):
            def wrapper(*args, **kwargs):
                with record(f"pfio.v2.Local:{attr.__name__}", trace=True):
                    return attr(*args, **kwargs)
            return wrapper
        else:
            return attr


class LocalFileStat(FileStat):
    """Detailed information of a POSIX file

    The information of file/directory is obtained through the `os.stat`.

    Attributes:
        filename (str): Derived from `~FileStat`.
        last_modified (float): Derived from `~FileStat`.
            ``os.stat_result.st_mtime``.
        last_accessed (float): ``os.stat_result.st_atime``.
        created (float): ``os.stat_result.st_ctime``.
        last_modified_ns (int): ``os.stat_result.st_mtime_ns``.
        last_accessed_ns (int): ``os.stat_result.st_atime_ns``.
        created_ns (float): ``os.stat_result.st_ctime``.
        mode (int): Derived from `~FileStat`. ``os.stat_result.st_mode``.
        size (int): Derived from `~FileStat`. ``os.stat_result.st_size``.
        owner (int): UID of owner in integer.
        group (int): GID of the file in integer.
        inode (int): ``os.stat_result.st_ino``.
        device (int): ``os.stat_result.st_dev``.
        nlink (int): ``os.stat_result.st_nlink``.
    """

    def __init__(self, _stat, filename):
        keys = (('last_modified', 'st_mtime'),
                ('last_accessed', 'st_atime'),
                ('last_modified_ns', 'st_mtime_ns'),
                ('last_accessed_ns', 'st_atime_ns'),
                ('created', 'st_ctime'), ('created_ns', 'st_ctime_ns'),
                ('mode', 'st_mode'), ('size', 'st_size'), ('uid', 'st_uid'),
                ('gid', 'st_gid'), ('ino', 'st_ino'), ('dev', 'st_dev'),
                ('nlink', 'st_nlink'))
        for k, ksrc in keys:
            setattr(self, k, getattr(_stat, ksrc))
        self.filename = filename


[docs] class Local(FS): def __init__(self, cwd=None, trace=False, create=False, scheme=None, **_): super().__init__(scheme=scheme) self.trace = trace if cwd is None: self._cwd = '' else: self._cwd = cwd if not self.isdir(''): if create: # Since this process (isdir -> makedirs) is not atomic, # makedirs can conflict in case of a parallel workload. os.makedirs(self._cwd, exist_ok=True) else: raise ValueError('{} must be a directory'.format(self._cwd)) @property def cwd(self): if self._cwd: return self._cwd return os.getcwd() @cwd.setter def cwd(self, value: str): self._cwd = value def _reset(self): pass def __repr__(self): return format_repr( Local, { "cwd": self._cwd, }, ) def open(self, file_path, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None): with record("pfio.v2.Local:open", trace=self.trace): path = os.path.join(self.cwd, file_path) fp = io.open(path, mode, buffering, encoding, errors, newline, closefd, opener) # Add ppe recorder to io class methods (e.g. read, write) if self.trace: return LocalProfileIOWrapper(fp) else: return fp
[docs] def list(self, path: Optional[str] = '', recursive=False, detail=False): for e in record_iterable("pfio.v2.Local:list", self._list(path, recursive, detail), trace=self.trace): yield e
def _list(self, path: Optional[str] = '', recursive=False, detail=False): path_or_prefix = os.path.join(self.cwd, "" if path is None else path) if recursive: path_or_prefix = path_or_prefix.rstrip("/") # plus 1 to include the trailing slash prefix_end_index = len(path_or_prefix) + 1 yield from self._recursive_list(prefix_end_index, path_or_prefix, detail) else: for e in os.scandir(path_or_prefix): # ls -F if detail: yield LocalFileStat(e.stat(), e.name) elif e.is_dir(): yield e.name + '/' else: yield e.name def _recursive_list(self, prefix_end_index: int, path: str, detail: bool): for e in os.scandir(path): # ls -F if detail: yield LocalFileStat(e.stat(), e.name) elif e.is_dir(): yield e.path[prefix_end_index:] + '/' else: yield e.path[prefix_end_index:] if e.is_dir(): yield from self._recursive_list(prefix_end_index, e.path, detail)
[docs] def stat(self, path): with record("pfio.v2.Local:stat", trace=self.trace): path = os.path.join(self.cwd, path) return LocalFileStat(os.stat(path), path)
[docs] def isdir(self, path: str): path = os.path.join(self.cwd, path) return os.path.isdir(path)
[docs] def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None): with record("pfio.v2.Local:mkdir", trace=self.trace): path = os.path.join(self.cwd, file_path) return os.mkdir(path, mode, *args, dir_fd=None)
[docs] def makedirs(self, file_path: str, mode=0o777, exist_ok=False): with record("pfio.v2.Local:makedirs", trace=self.trace): path = os.path.join(self.cwd, file_path) return os.makedirs(path, mode, exist_ok)
[docs] def exists(self, file_path: str): with record("pfio.v2.Local:exists", trace=self.trace): path = os.path.join(self.cwd, file_path) return os.path.exists(path)
[docs] def rename(self, src, dst): with record("pfio.v2.Local:rename", trace=self.trace): s = os.path.join(self.cwd, src) d = os.path.join(self.cwd, dst) return os.rename(s, d)
[docs] def remove(self, file_path: str, recursive=False): with record("pfio.v2.Local:remove", trace=self.trace): file_path = os.path.join(self.cwd, file_path) if recursive: return shutil.rmtree(file_path) if os.path.isdir(file_path): return os.rmdir(file_path) return os.remove(file_path)
[docs] def glob(self, pattern: str): with record("pfio.v2.Local:glob", trace=self.trace): return [ str(item.relative_to(self.cwd)) for item in pathlib.Path(self.cwd).glob(pattern)]
def _canonical_name(self, file_path: str) -> str: return "file:/" + os.path.normpath(os.path.join(self.cwd, file_path))