import functools
import os.path
import uuid
import warnings
from fnmatch import fnmatch, fnmatchcase
from io import IOBase
from os import PathLike
from pathlib import PurePosixPath
from posixpath import join as joinpath
from posixpath import normpath
from sys import version_info as python_version_info
from typing import (Callable, Iterator, List, Optional, Sequence, Set, Tuple,
Type, TypeVar, Union)
from urllib.parse import ParseResult, urlunparse
from .fs import FS, FileStat
from .hdfs import Hdfs
from .local import Local
from .s3 import S3
from .zip import Zip
SelfPurePathType = TypeVar("SelfPurePathType", bound="PurePath")
SelfPathType = TypeVar("SelfPathType", bound="Path")
@functools.lru_cache(maxsize=16)
def _has_directory_feature(fs: FS) -> bool:
if isinstance(fs, S3):
# NOTE: Apache Ozone supports supports directories
# depending on its configuration.
# refers to `ozone.om.enable.filesystem.paths` property.
dpath = f"__pfio_check_{str(uuid.uuid4())[:8]}"
upath = f"{dpath}/__tmp"
with fs.open(upath, mode="wb") as f:
f.write(b"")
fs.remove(upath)
# NOTE: directory creates automatically
# if `ozone.om.enable.filesystem.paths` is enabled.
if fs.exists(dpath):
fs.remove(dpath)
return True
else:
return False
else:
return True
def _compare_fs(lhs: FS, rhs: FS) -> bool:
if type(lhs) is type(rhs):
assert isinstance(lhs.cwd, str)
assert isinstance(rhs.cwd, str)
if isinstance(lhs, Local) and isinstance(rhs, Local):
return lhs.cwd == rhs.cwd
elif isinstance(lhs, S3) and isinstance(rhs, S3):
return (
lhs.cwd == rhs.cwd
and lhs.bucket == rhs.bucket
and lhs.endpoint == rhs.endpoint
)
elif isinstance(lhs, Hdfs) and isinstance(rhs, Hdfs):
return lhs.cwd == rhs.cwd and lhs.username == rhs.username
elif isinstance(lhs, Zip) and isinstance(rhs, Zip):
return lhs.cwd == rhs.cwd
else:
raise ValueError(f"unsupported FS: {lhs} and {rhs}")
return False
def _not_supported(name: Optional[str] = None) -> NotImplementedError:
if not name:
import inspect
stack = inspect.stack()
if "self" in stack[1].frame.f_locals:
cname = stack[1].frame.f_locals["self"].__class__.__name__
elif "cls" in stack[1].frame.f_locals:
cname = stack[1].frame.f_locals["cls"].__name__
else:
cname = ""
fname = stack[1].function
name = f"{cname}.{fname}" if cname else fname
return NotImplementedError(f"`{name}` is unsupported on this system")
class PurePath(PathLike):
"""
`pathlib.PurePosixPath` compatible interface.
Args:
args: construct paths.
fs: target file system.
Note:
It conforms to `pathlib.PurePosixPath` of Python 3.12 specification.
this class not inherits any `pathlib` classes because
pfio filesystems is not suitable for pathlib abstact
and helper classes.
"""
def __init__(
self,
*args: Union[str, PathLike],
fs: FS,
) -> None:
self._fs: FS = fs
self._pure = PurePosixPath(*args)
self._hash = hash(self._pure) + hash(self._fs) + hash(self.scheme)
@property
def sep(self) -> str:
return "/"
@property
def scheme(self) -> str:
return self._fs.scheme
def __hash__(self) -> int:
return self._hash
def __eq__(self, other: object) -> bool:
if isinstance(other, PurePath):
return self._pure == other._pure and _compare_fs(
self._fs, other._fs
)
else:
return NotImplemented
def __fspath__(self) -> str:
return self._pure.__fspath__()
def __str__(self) -> str:
return self.__fspath__()
def __repr__(self) -> str:
return f"{type(self).__name__}('{self.__fspath__()}')"
def __bytes__(self) -> bytes:
return self._pure.__bytes__()
def __truediv__(
self: SelfPurePathType,
other: Union[str, PathLike, SelfPurePathType],
) -> SelfPurePathType:
try:
return self.with_segments(self._pure / str(other))
except TypeError:
return NotImplemented
def __rtruediv__(
self: SelfPurePathType,
other: Union[str, PathLike, SelfPurePathType],
) -> SelfPurePathType:
try:
return self.with_segments(str(other) / self._pure)
except TypeError:
return NotImplemented
# ---------------------------------------
# pathlib.PurePath compatible properties
# ---------------------------------------
@property
def parts(self) -> Tuple[str, ...]:
return self._pure.parts
@property
def drive(self) -> str:
return self._pure.drive
@property
def root(self) -> str:
return self._pure.root
@property
def anchor(self) -> str:
return self._pure.anchor
@property
def parents(self: SelfPurePathType) -> Sequence[SelfPurePathType]:
# FIXME: reduce costs
p = self.parts
paths = [self.with_segments(*p[:-i]) for i in range(1, len(p) + 1)]
if self.is_absolute():
paths.pop(-1)
return paths
@property
def parent(self: SelfPurePathType) -> SelfPurePathType:
return self.with_segments(self._pure.parent)
@property
def name(self) -> str:
return self._pure.name
@property
def suffix(self) -> str:
return self._pure.suffix
@property
def suffixes(self) -> List[str]:
return self._pure.suffixes
@property
def stem(self) -> str:
return self._pure.stem
# ---------------------------------------
# pathlib.PurePath compatible methods
# ---------------------------------------
def as_posix(self) -> str:
return self._pure.as_posix()
def as_uri(self) -> str:
if self.is_absolute():
if isinstance(self._fs, S3):
netloc = self._fs.bucket
assert isinstance(netloc, str)
elif isinstance(self._fs, Hdfs):
raise NotImplementedError
else:
netloc = ""
prefix = self._fs.cwd
assert isinstance(prefix, str)
key = self.as_posix().removeprefix(self.sep)
parsed = ParseResult(
scheme=self.scheme,
netloc=netloc,
path=joinpath(prefix, key),
params="",
query="",
fragment="",
)
return urlunparse(parsed)
else:
raise ValueError(f"'{self}' is not absolute path")
def is_absolute(self) -> bool:
return self._pure.is_absolute()
def is_relative_to(self, *other: Union[str, PathLike]) -> bool:
# Same rationale as `relative_to`: unwrap pfio PurePath so
# 3.14's stdlib does not skip its `PurePosixPath` coercion,
# and shim the multi-arg form that stdlib removed in 3.14.
parts = tuple(
o._pure if isinstance(o, PurePath) else o for o in other
)
if len(parts) > 1:
# NOTE: drop this shim (and the `*other` signature) once
# downstream callers have moved to a single joined path.
warnings.warn(
"passing multiple positional arguments to "
"pfio.v2.pathlib.PurePath.is_relative_to is deprecated; "
"pass a single joined path instead",
DeprecationWarning,
stacklevel=2,
)
parts = (PurePosixPath(*parts),)
return self._pure.is_relative_to(*parts) # type: ignore
def is_reserved(self) -> bool:
# `pathlib.PurePath.is_reserved` was deprecated in 3.13 and is
# removed in 3.15; the replacement `os.path.isreserved` (added
# in 3.14) lives only in `ntpath`, so POSIX has no successor.
if python_version_info.minor < 14:
return self._pure.is_reserved()
if os.name == "nt":
return os.path.isreserved(self._pure) # type: ignore[attr-defined]
return False
def joinpath(
self: SelfPurePathType,
*args: Union[str, PathLike],
) -> SelfPurePathType:
return self.with_segments(self._pure.joinpath(*args))
def match(
self,
pattern: Union[str, PathLike],
*,
case_sensitive: bool = False,
) -> bool:
pattern = str(pattern)
if case_sensitive:
return fnmatchcase(self.as_uri(), pattern)
else:
return fnmatch(self.as_uri(), pattern)
def relative_to(
self: SelfPurePathType,
*other: Union[str, PathLike],
walk_up: bool = False,
) -> SelfPurePathType:
if python_version_info.minor < 12 and walk_up:
raise NotImplementedError(
"`walk_up=True` supports python version 3.12 or higher"
)
# Python 3.14 stdlib skips PurePosixPath coercion for objects
# exposing `with_segments`, which pfio's PurePath does. Unwrap
# to the inner stdlib path so subpath checks compare like-typed
# values.
parts = tuple(
o._pure if isinstance(o, PurePath) else o for o in other
)
if len(parts) > 1:
# NOTE: Python 3.14 removed the multi-arg form of
# PurePath.relative_to from stdlib. pfio keeps the shape
# here for now and joins the parts before delegating, with
# a DeprecationWarning so callers can migrate. Drop this
# shim (and the `*other` signature) once downstream has
# moved to a single joined path argument.
warnings.warn(
"passing multiple positional arguments to "
"pfio.v2.pathlib.PurePath.relative_to is deprecated; "
"pass a single joined path instead",
DeprecationWarning,
stacklevel=2,
)
parts = (PurePosixPath(*parts),)
if python_version_info.minor >= 12:
rel = self._pure.relative_to(
*parts,
walk_up=walk_up, # type: ignore
)
else:
rel = self._pure.relative_to(*parts)
return self.with_segments(rel)
def with_name(self: SelfPurePathType, name: str) -> SelfPurePathType:
return self.with_segments(self._pure.with_name(name))
def with_stem(self: SelfPurePathType, stem: str) -> SelfPurePathType:
return self.with_segments(self._pure.with_stem(stem))
def with_suffix(self: SelfPurePathType, suffix: str) -> SelfPurePathType:
return self.with_segments(self._pure.with_suffix(suffix))
def with_segments(
self: SelfPurePathType,
*args: Union[str, PathLike],
) -> SelfPurePathType:
return type(self)(*args, fs=self._fs)
[docs]
class Path(PurePath):
"""
`pathlib.PosixPath` compatible interface.
Args:
args: construct paths.
fs: target file system.
Note:
many methods raise `NotImplementedError`
because they require unsupported features of `FS`.
several methods behave slightly different,
- `stat` returns `FileStat` object instead of `os.stat_result`.
- `glob`, `rglob`, `iterdir` will not return directory type object.
"""
def __init__(
self,
*args: str,
fs: FS,
) -> None:
super().__init__(*args, fs=fs)
def _as_relative_to_fs(self) -> str:
return self.as_posix().removeprefix(self.anchor)
# ---------------------------------------
# pathlib.Path compatible classmethods
# ---------------------------------------
@classmethod
def cwd(cls: Type[SelfPathType]) -> SelfPathType:
raise _not_supported()
@classmethod
def home(cls: Type[SelfPathType]) -> SelfPathType:
raise _not_supported()
# ---------------------------------------
# pathlib.Path compatible methods
# ---------------------------------------
def stat(self, *, follow_symlinks: bool = True) -> FileStat:
if not follow_symlinks:
raise _not_supported("follow_symlinks=False")
return self._fs.stat(self._as_relative_to_fs())
def chmod(self, *, mode: int, follow_symlinks: bool = True) -> None:
raise _not_supported()
def exists(self, *, follow_symlinks: bool = True) -> bool:
if not follow_symlinks:
raise _not_supported("follow_symlinks=False")
return self.is_file() or self.is_dir()
def expanduser(self: SelfPathType) -> SelfPathType:
raise _not_supported()
def glob(
self: SelfPathType,
pattern: str,
*,
case_sensitive: Optional[bool] = None,
) -> Iterator[SelfPathType]:
if case_sensitive is not None:
raise _not_supported("case_sensitive=True or False")
match_ = fnmatchcase if case_sensitive else fnmatch
p = self._as_relative_to_fs()
recursive = "**" in pattern
# NOTE: `pathlib.Path.glob` interprets "**/*" to "*"
if pattern.endswith("**/*"):
pattern = pattern.replace("**/*", "*")
pattern = pattern.replace("**", "*")
prefixes: Set[SelfPathType] = set()
# NOTE: `S3.glob` is not implemented...
# it use `FS.list` instead of `glob`.
try:
for entry in self._fs.list(p, recursive=recursive):
assert isinstance(entry, str)
e = self.with_segments(*self.parts)
for part in entry.split(self.sep):
if part:
e = e / part
if e not in prefixes and match_(str(e), pattern):
prefixes.add(e)
yield e
except FileNotFoundError:
# NOTE: raise from `os.scandir` in `Local` class.
pass
def group(self) -> str:
raise _not_supported()
def is_dir(self) -> bool:
p = self._as_relative_to_fs() + self.sep
if _has_directory_feature(self._fs):
return self._fs.isdir(p) # type: ignore
else:
# FIXME: `S3.isdir` has a bug (?)
for entry in self._fs.list(p, recursive=True):
assert isinstance(entry, str)
if entry and not entry.endswith(self.sep):
return True
return False
def is_file(self) -> bool:
p = self._as_relative_to_fs()
return self._fs.exists(p) and not self._fs.isdir(p)
def is_junction(self) -> bool:
return False # only Windows supports junctions
def is_mount(self) -> bool:
raise _not_supported()
def is_symlink(self) -> bool:
raise _not_supported()
def is_socket(self) -> bool:
raise _not_supported()
def is_fifo(self) -> bool:
raise _not_supported()
def is_block_device(self) -> bool:
raise _not_supported()
def is_char_device(self) -> bool:
raise _not_supported()
def iterdir(self: SelfPathType) -> Iterator[SelfPathType]:
if self.is_dir():
for entry in self._fs.list(self._as_relative_to_fs()):
assert isinstance(entry, str)
yield self.with_segments(*self.parts, entry)
else:
raise NotADirectoryError(f"'{self.as_posix()}' is not a directory")
def walk(
self,
top_down: bool = True,
on_error: Optional[Callable] = None,
follow_symlinks: bool = False,
) -> Iterator[Tuple["Path", List[str], List[str]]]:
raise _not_supported()
def lchmod(self, mode: int) -> None:
raise _not_supported()
def lstat(self) -> FileStat:
raise _not_supported()
def mkdir(
self,
mode: int = 0o777,
parents: bool = False,
exist_ok: bool = False,
) -> None:
if _has_directory_feature(self._fs):
if not parents and not self.parent.is_dir():
raise FileNotFoundError(
f"'{self.parent.as_posix()}' is not a directory"
)
self._fs.makedirs(
self._as_relative_to_fs(),
mode=mode,
exist_ok=exist_ok,
)
def open(
self,
mode: str = "r",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
) -> IOBase:
# NOTE: first argument is `file_path`` in `Local`,
# but `S3` is `path`.
return self._fs.open( # type: ignore
self._as_relative_to_fs(),
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
)
def owner(self) -> str:
raise _not_supported()
def read_bytes(self) -> bytes:
with self.open(mode="rb", buffering=0) as f:
return f.read() # type: ignore
def read_text(
self,
encoding: Optional[str] = None,
errors: Optional[str] = None,
) -> str:
with self.open(mode="rt", encoding=encoding, errors=errors) as f:
return f.read() # type: ignore
def readlink(self: SelfPathType) -> SelfPathType:
raise _not_supported()
def rename(
self: SelfPathType,
target: Union[str, PurePath],
) -> SelfPathType:
if isinstance(self._fs, S3):
# NOTE: S3 API does not support rename a `prefix`.
# we implemented by `copy` + `remove`.
# it is generic but poor performance...
target = self.with_segments(
str(target).removeprefix(self.anchor)
)
copy(self, target)
unlink(self)
else:
target = (
target.as_posix().removeprefix(self.anchor)
if isinstance(target, PurePath)
else target
)
self._fs.rename(self._as_relative_to_fs(), target)
return self.with_segments(str(target))
def replace(
self: SelfPathType,
target: Union[str, PurePath],
) -> SelfPathType:
return self.rename(target)
def absolute(self: SelfPathType) -> SelfPathType:
return self.with_segments("/" / self._pure)
def resolve(self: SelfPathType, strict: bool = False) -> SelfPathType:
# NOTE: symbolic link is not supported
normalized_path = normpath(self._as_relative_to_fs())
resolved_path = self.with_segments(normalized_path).absolute()
if strict and not resolved_path.exists():
raise FileNotFoundError(f"'{self.as_posix()}' not found")
return resolved_path
def rglob(
self: SelfPathType,
pattern: str,
*,
case_sensitive: Optional[bool] = None,
) -> Iterator[SelfPathType]:
return self.glob(
joinpath("**", pattern.removeprefix(self.sep)),
case_sensitive=case_sensitive,
)
def rmdir(self) -> None:
if _has_directory_feature(self._fs):
if self.is_dir():
self._fs.remove(self._as_relative_to_fs())
else:
raise NotADirectoryError(
f"'{self.as_posix()}' is not a directory"
)
def samefile(self, other_path: Union[str, SelfPathType]) -> bool:
raise _not_supported()
def symlink_to(
self,
target: Union[str, PathLike],
target_is_directory: bool = False,
) -> None:
raise _not_supported()
def hardlink_to(self, target: Union[str, PathLike]) -> None:
raise _not_supported()
def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
if self.exists() and not exist_ok:
raise FileExistsError(f"'{self.as_posix()}' exists")
with self.open("wb") as f:
f.write(b"")
def unlink(self, missing_ok: bool = False) -> None:
if self.is_dir():
raise IsADirectoryError(f"'{self.as_posix()}' is a directory")
elif self.is_file():
self._fs.remove(self._as_relative_to_fs())
elif not missing_ok:
raise FileNotFoundError(f"'{self.as_posix()}' is not a file")
def write_bytes(self, data: bytes) -> int:
with self.open(mode="wb") as f:
return f.write(data) # type: ignore
def write_text(
self,
data: str,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
) -> int:
with self.open(
mode="wt",
encoding=encoding,
errors=errors,
newline=newline,
) as f:
return f.write(data) # type: ignore
def copy(
src: Path,
dst: Path,
*,
chunk_size: int = 16 * (2**20),
) -> None:
"""
simplified copy interface.
Args:
src: source path of file or directory.
dst: destination path of file or directory.
chunk_size: blob size to copy object.
Note:
if `src` is a directory, all child entries in
directory will copied to `dst` recursively.
"""
src = src.resolve()
dst = dst.resolve()
if src.is_file():
if dst.is_dir():
raise IsADirectoryError(f"{dst.as_uri()} is a directory")
with src.open("rb") as rf, dst.open("wb") as wf:
while chunk := rf.read(chunk_size):
wf.write(chunk)
elif src.is_dir():
if dst.is_file():
raise NotADirectoryError(f"{dst.as_uri()} is a file")
for entry in src.rglob("*"):
if entry.is_dir():
continue
entry = entry.relative_to(src)
prefix = entry.parent
(dst / prefix).mkdir(parents=True, exist_ok=True)
rf = (src / entry).open("rb")
wf = (dst / entry).open("wb")
with rf, wf:
while chunk := rf.read(chunk_size):
wf.write(chunk)
else:
raise RuntimeError(f"unexpected storage object: {src.as_uri()}")
def unlink(target: Path) -> None:
"""
simplified remove interface.
Args:
target: path of file or directory.
Note:
if `target` is a directory, all child entries in
directory will removed recursively.
"""
target = target.resolve()
if target.is_dir():
files: List[Path] = []
dirs: Set[Path] = set()
for entry in target.rglob("*"):
if entry.is_dir():
continue
if entry.is_file():
dirs.add(entry.parent)
files.append(entry)
else:
raise RuntimeError(f"unexpected entry: {entry.as_uri()}")
for f in files:
f.unlink(missing_ok=True)
_dirs = sorted(dirs, reverse=True, key=lambda s: len(str(s)))
for d in _dirs:
if d.is_dir():
d.rmdir()
if target.is_dir():
target.rmdir()
elif target.is_file():
target.unlink(missing_ok=True)
else:
raise RuntimeError(f"unexpected storage object: {target.as_uri()}")