import abc
import contextlib
import copy
import os
import stat
import warnings
from abc import abstractmethod
from io import IOBase
from types import TracebackType
from typing import Any, Callable, Dict, Iterator, Optional, Type, Union
from urllib.parse import urlparse
from deprecation import deprecated
from pfio.v2 import config
from pfio.version import __version__ # NOQA
class FileStat(abc.ABC):
"""Detailed file or directory information abstraction
:meth:`pfio.v2.FS.stat` returns an object that implements of
``FileStat``. In addition to the common attributes that the
``FileStat`` abstract provides, each ``FileStat`` subclass
implements some additional attributes depending on what
information the corresponding filesystem or container can handle.
The common attributes have the same behavior despite filesystem or
container type difference.
Attributes:
filename (str):
Filename in the filesystem or container.
last_modifled (float):
UNIX timestamp of mtime. Note that some
filesystems or containers do not have sub-second precision.
mode (int):
Permission with file type flag (regular file or directory).
You can make a human-readable interpretation by
`stat.filemode <https://docs.python.org/3/library/stat.html#stat.filemode>`_.
size (int):
Size in bytes. Note that directories may have different
sizes depending on the filesystem or container type.
""" # NOQA
filename = None
last_modified = None
mode = None
size = None
def isdir(self):
"""Returns whether the target is a directory from the permission flag
Note that some systems do not support directory tree semantics.
Returns:
`True` if directory, `False` otherwise.
"""
return bool(self.mode & 0o40000)
def __str__(self):
if isinstance(self.mode, int):
mode = stat.filemode(self.mode)
else:
mode = self.mode
return '<{} filename="{}" mode="{}">'.format(
type(self).__name__, self.filename, mode)
def __repr__(self):
return str(self.__str__())
[docs]
class ForkedError(RuntimeError):
'''An error class when PFIO found the process forked.
If an FS object is not "lazy", any object usage detects process
fork and raises this ``ForkedError`` as soon as possible at the
child process. The parent process may or may not run well,
depending on the ``FS`` implementation.
'''
pass
[docs]
class FS(abc.ABC):
'''FS access abstraction
'''
_cwd = ''
def __init__(self):
self.pid = os.getpid()
@property
def cwd(self):
return self._cwd
@cwd.setter
def cwd(self, value):
self._cwd = value
@abstractmethod
def open(self, file_path: str, mode: str = 'rb',
buffering: int = -1, encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
closefd: bool = True,
opener: Optional[Callable[
[str, int], Any]] = None) -> IOBase:
raise NotImplementedError()
def open_zip(self, file_path: str, mode='r', **kwargs):
# Avoid circular import
from .zip import _open_zip
return _open_zip(self, file_path, mode, **kwargs)
# Self-typing needs Python 3.11, PEP-673
[docs]
def subfs(self, rel_path: str) -> 'FS':
'''Virtually changes the working directory
By default it performs shallow copy. If any resource that as
different lifecycles than the copy source (e.g. HDFS
connection and zipfile.ZipFile object), they also will be
copied by overriding this method.
'''
if rel_path.startswith("/"):
raise RuntimeError("Absolute path is not supported")
elif '..' in rel_path.split(os.path.sep):
raise RuntimeError("Only subtree is supported")
return self._newfs(os.path.join(self.cwd, rel_path))
def _newfs(self, path: str) -> 'FS':
fs = copy.copy(self)
fs._cwd = path
fs._reset()
return fs
def _checkfork(self):
if not self.is_forked:
return
# Forked!
self._reset()
self.pid = os.getpid()
@abstractmethod
def _reset(self):
raise NotImplementedError()
@property
def is_forked(self):
assert hasattr(self, 'pid')
return self.pid != os.getpid()
def close(self) -> None:
pass
[docs]
@abstractmethod
def list(self, path_or_prefix: Optional[str] = None,
recursive=False, detail=False) -> Iterator[Union[FileStat, str]]:
"""Lists all the files and directories under
the given ``path_or_prefix``
Args:
path_or_prefix (str): The path to list against.
When we get the default value, ``list`` shows the content under
the working directory as the default value.
However, if a ``path_or_prefix`` is given,
then it shows only the files and directories
under the ``path_or_prefix``.
recursive (bool): When this is ``True``, list files and directories
recursively.
detail (bool): If this is ``True``, the return values will be the
detail information of each file or directory.
Returns:
An Iterator that iterates though the files and directories.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def stat(self, path: str) -> FileStat:
"""Show details of a file
It returns an object of subclass of :class:`pfio.io.FileStat`
in accordance with filesystem or container type.
Args:
path (str): The path to file
Returns:
:class:`pfio.io.FileStat` object.
"""
raise NotImplementedError()
def __enter__(self) -> 'FS':
return self
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType]):
self.close()
[docs]
@abstractmethod
def isdir(self, file_path: str) -> bool:
"""Returns ``True`` if the path is an existing directory
Args:
path (str): the path to the target directory
Returns:
``True`` when the path points to a directory,
``False`` when it is not
"""
raise NotImplementedError()
[docs]
@abstractmethod
def mkdir(self, file_path: str, mode: int = 0o777,
*args, dir_fd: Optional[int] = None) -> None:
"""Makes a directory with mode
Args:
path (str): the path to the directory to make
mode (int): the mode of the new directory
"""
raise NotImplementedError()
[docs]
@abstractmethod
def makedirs(self, file_path: str, mode: int = 0o777,
exist_ok: bool = False) -> None:
"""Makes directories recursively with mode
Also creates all the missing parents of the given path.
Args:
path (str): the path to the directory to make.
mode (int): the mode of the directory
exist_ok (bool): In default case, a ``FileExitsError`` will be
raised when the target directory exists.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def exists(self, path: str) -> bool:
"""Returns the existence of the path
When the ``file_path`` points to a symlink, the return value
depends on the actual file instead of the link itself.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def rename(self, src: str, dst: str) -> None:
"""Renames the file from ``src`` to ``dst``
On systems and situation where rename functionality is
proviced, it renames the file or the directory.
Args:
src (str): the current name of the file or directory.
dst (str): the name to rename to.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def remove(self, file_path: str, recursive: bool = False) -> None:
"""Removes a file or directory
Args:
path (str): the target path to remove. The ``path`` can be a
regular file or a directory.
recursive (bool): When the given path is a directory,
all the files and directories under it will be removed.
When the path is a file, this option is ignored.
"""
raise NotImplementedError()
[docs]
def glob(self, pattern: str) -> Iterator[Union[FileStat, str]]:
"""Returns the files and dictories that match the glob pattern.
"""
raise NotImplementedError()
@abstractmethod
def _canonical_name(self, file_path: str) -> str:
"""Returns its canonical name. Canonical name includes its filesystem
name, endpoint of filesystem, and file_path to represent its consistent
naming. Designed to be used in :py:class:`pfio.v2.HTTPCachedFS`.
"""
raise NotImplementedError
[docs]
@contextlib.contextmanager
def open_url(url: str, mode: str = 'r', **kwargs) -> Iterator[IOBase]:
'''Opens a file regardless of the backend FS type
``url`` must be compliant with URL standard in
https://url.spec.whatwg.org/ . As this function implements
context manager, the FileObject can be written as::
with open_url("s3://bucket.example.com/path/your-file.txt", 'r') as f:
f.read()
.. note:: Some FS resources won't be closed when using this
functionality. See ``from_url`` for keyword arguments.
Returns:
a FileObject that must be closed.
'''
dirname, filename = os.path.split(url)
with from_url(dirname, **kwargs) as fs:
with fs.open(filename, mode) as fp:
yield fp
[docs]
def from_url(url: str, **kwargs) -> 'FS':
'''Factory pattern implementation, creates FS from URI
If ``force_type`` is set with archive type, not scheme,
it ignores the suffix and tries the specified archive
format by opening the blob file.
If ``force_type`` is set with scheme type, the FS will
built from it accordingly. The URL path is supposed to
be a directory for file systems or a path prefix for S3.
.. warning:: When opening an ``hdfs://...`` URL, be sure about
forking context. See: :class:`Hdfs` for discussion.
Arguments:
url (str): A URL string compliant with RFC 1738.
force_type (str): Force type of FS to be returned.
One of "zip", "hdfs", "s3", or "file", returned
respectively. Default is ``"file"``.
create (bool): Create the specified path doesn't exist.
http_cache (str): Prefix url of http cached entries.
In the filesystem with http_cache specified, all read access will
be hooked and upload its content to the url with the given prefix.
For details, please refer to :py:class:`pfio.v2.HTTPCachedFS`.
(experimental feature)
.. note:: Some FS resources won't be closed when using this
functionality.
.. note:: Pickling the FS object may or may not work correctly
depending on the implementation.
'''
if kwargs.pop('reset_on_fork', None) is not None:
warnings.warn(
"reset_on_fork is deprecated. PFIO resets on fork by default",
category=DeprecationWarning,
stacklevel=2
)
parsed = urlparse(url)
if parsed.scheme:
scheme = parsed.scheme
else:
scheme = 'file' # Default is local
# When ``force_type`` is defined, it must be equal with given one.
force_type = kwargs.pop('force_type', None)
if force_type is not None and force_type != "zip":
if force_type != scheme:
raise ValueError("URL scheme mismatch with forced type")
http_cache_prefix = kwargs.pop("http_cache", None)
def _zip_check_create_not_supported():
if kwargs.get('create', False):
msg = '"create" option is not supported for Zip FS.'
raise ValueError(msg)
# force_type \ suffix | .zip | other
# --------------------+---------+------
# zip | ok | try zip
# (other) | try dir | try dir
# None | try zip | try dir
if force_type == 'zip':
_zip_check_create_not_supported()
dirname, filename = os.path.split(parsed.path)
fs = _from_scheme(scheme, dirname, kwargs, bucket=parsed.netloc)
fs = fs.open_zip(filename, **kwargs)
elif force_type is None:
if parsed.path.endswith('.zip'):
_zip_check_create_not_supported()
dirname, filename = os.path.split(parsed.path)
fs = _from_scheme(scheme, dirname, kwargs, bucket=parsed.netloc)
fs = fs.open_zip(filename, **kwargs)
else:
dirname = parsed.path
fs = _from_scheme(scheme, dirname, kwargs, bucket=parsed.netloc)
else:
dirname = parsed.path
fs = _from_scheme(scheme, dirname, kwargs, bucket=parsed.netloc)
if http_cache_prefix is not None:
from .http_cache import HTTPCachedFS
fs = HTTPCachedFS(http_cache_prefix, fs)
return fs
def _from_scheme(scheme, dirname, kwargs, bucket=None):
known_scheme = ['file', 'hdfs', 's3']
# Custom scheme; using configparser for older Python. Will
# update to toml in Python 3.11 once 3.10 is in the end.
if scheme not in known_scheme:
config_dict = config.get_custom_scheme(scheme)
if config_dict is not None:
scheme = config_dict.pop('scheme') # Get the real scheme
# Custom scheme expected here
if scheme not in known_scheme:
raise ValueError("Scheme {} is not supported", scheme)
for k in config_dict:
if k not in kwargs:
# Don't overwrite with configuration value
kwargs[k] = config_dict[k]
if scheme == 'file':
from .local import Local
fs = Local(dirname, **kwargs)
elif scheme == 'hdfs':
from .hdfs import Hdfs
fs = Hdfs(dirname, **kwargs)
elif scheme == 's3':
from .s3 import S3
fs = S3(bucket=bucket, prefix=dirname, **kwargs)
else:
raise RuntimeError("scheme '{}' is not defined".format(scheme))
return fs
[docs]
@deprecated(deprecated_in='2.2.0', removed_in='2.3.0',
current_version=__version__)
def lazify(init_func, lazy_init=True, recreate_on_fork=True):
'''Make FS init lazy and recreate on fork
'''
pass
def format_repr(cls: Type, data: Dict[str, Any]) -> str:
data_str = ", ".join(f"{name}={value!r}" for name, value in data.items())
return f"{cls.__module__}.{cls.__name__}({data_str})"