import io
import logging
import os
import warnings
import zipfile
from datetime import datetime
from typing import Optional, Set
from pfio.cache.sparse_file import MPCachedWrapper
from .fs import FS, FileStat, format_repr
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
class ZipFileStat(FileStat):
"""Detailed information of a file in a Zip
Attributes:
filename (str): Derived from `~FileStat`.
orig_filename (str): ``ZipFile.orig_filename``.
comment (str): ``ZipFile.comment``.
last_modifled (float): Derived from `~FileStat`.
No sub-second precision.
mode (int): Derived from `~FileStat`.
size (int): Derived from `~FileStat`.
create_system (int): ``ZipFile.create_system``.
create_version (int): ``ZipFile.create_version``.
extract_version (int): ``ZipFile.extract_version``.
flag_bits (int): ``ZipFile.flag_bits``.
volume (int): ``ZipFile.volume``.
internal_attr (int): ``ZipFile.internal_attr``.
external_attr (int): ``ZipFile.external_attr``.
header_offset (int): ``ZipFile.header_offset``.
compress_size (int): ``ZipFile.compress_size``.
compress_type (int): ``ZipFile.compress_type``.
CRC (int): ``ZipFile.CRC``.
"""
def __init__(self, zip_info):
self.last_modified = float(datetime(*zip_info.date_time).timestamp())
# https://github.com/python/cpython/blob/3.8/Lib/zipfile.py#L392
self.mode = zip_info.external_attr >> 16
self.size = zip_info.file_size
for k in ('filename', 'orig_filename', 'comment', 'create_system',
'create_version', 'extract_version', 'flag_bits',
'volume', 'internal_attr', 'external_attr', 'CRC',
'header_offset', 'compress_size', 'compress_type'):
setattr(self, k, getattr(zip_info, k))
[docs]
class Zip(FS):
_readonly = True
'''
local_cache (bool): use sparse file cache for opening ZIP file
local_cachedir (dir): local path to store sparse file cache
'''
def __init__(self, backend, file_path, mode='r', create=False,
local_cache=False, local_cachedir=None, **kwargs):
super().__init__()
self.backend = backend
self.file_path = file_path
self.mode = mode
self.kwargs = kwargs
if create:
raise ValueError("create option is not supported")
if 'r' in mode and 'w' in mode:
raise io.UnsupportedOperation('Read-write mode is not supported')
if 'w' in mode:
self._readonly = False
self.local_cache = local_cache
self.local_cachedir = local_cachedir
self.local_cachefile = None
self.local_indexfile = None
self._reset()
def _reset(self):
buffering = self.kwargs.get('buffering', 16*1024*1024)
if self.local_cache:
# Don't use io.BufferedReader for sparse file cache
self.kwargs['buffering'] = 0
obj = self.backend.open(self.file_path,
self.mode + 'b',
**self.kwargs)
if 'w' not in self.mode:
stat = self.backend.stat(self.file_path)
# Use sparse file cache: Optimization for a remote object
# store system e.g. AWS S3 or HDFS
if self.local_cache:
# Will be removed in 2.8
warnings.warn("Sparse file cache deprecated in 2.7",
DeprecationWarning)
self.kwargs['buffering'] = buffering
obj = MPCachedWrapper(obj, stat.size, self.local_cachedir,
local_cachefile=self.local_cachefile,
local_indexfile=self.local_indexfile,
close_on_close=True,
multithread_safe=True)
# Update local cachefile in case of being forked
self.local_cachefile = obj.local_cachefile
self.local_indexfile = obj.local_indexfile
# Default 16MB buffer size
if buffering > 0:
obj = io.BufferedReader(obj, buffer_size=buffering)
self.fileobj = obj
assert self.fileobj is not None
self.zipobj = zipfile.ZipFile(self.fileobj, self.mode)
self.name_cache: Optional[Set[str]] = None
if self._readonly:
self.name_cache = self._names()
def __getstate__(self):
state = self.__dict__.copy()
state['fileobj'] = None
state['zipobj'] = None
state['name_cache'] = None
return state
def __setstate__(self, state):
self.__dict__ = state
def __repr__(self):
return format_repr(
Zip,
{
"file_path": self.file_path,
"mode": self.mode,
"backend": self.backend,
},
)
def open(self, file_path, mode='r',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
fp = self.zipobj.open(file_path, mode.replace('b', ''))
if 'b' not in mode:
fp = io.TextIOWrapper(fp, encoding, errors, newline)
return fp
[docs]
def subfs(self, path):
# TODO
raise NotImplementedError()
def close(self):
self._checkfork()
self.zipobj.close()
self.fileobj.close()
[docs]
def stat(self, path):
self._checkfork()
names = self._names()
path = os.path.join(self.cwd, os.path.normpath(path))
if path in names:
actual_path = path
elif not path.endswith('/') and path + '/' in names:
# handles cases when path is a directory but without trailing slash
# see issue $67
actual_path = path + '/'
else:
raise FileNotFoundError(
"{} is not found".format(path))
return ZipFileStat(self.zipobj.getinfo(actual_path))
[docs]
def list(self, path_or_prefix: Optional[str] = "", recursive=False,
detail=False):
self._checkfork()
if path_or_prefix:
path_or_prefix = os.path.join(self.cwd,
os.path.normpath(path_or_prefix))
# cannot move beyond root
given_dir_list = path_or_prefix.split('/')
if ("." in given_dir_list or ".." in given_dir_list
or {""} == set(given_dir_list)):
given_dir_list = []
path_or_prefix = ""
else:
given_dir_list = []
if path_or_prefix:
if self.exists(path_or_prefix) and not self.isdir(path_or_prefix):
raise NotADirectoryError(
"{} is not a directory".format(path_or_prefix))
elif not any(name.startswith(path_or_prefix + "/")
for name in self._names()):
# check if directories are NOT included in the zip
# such kind of zip can be made with "zip -D"
raise FileNotFoundError(
"{} is not found".format(path_or_prefix))
if recursive:
for info in self.zipobj.infolist():
name = info.filename
assert path_or_prefix is not None
if name.startswith(path_or_prefix):
name = name[len(path_or_prefix):].strip("/")
if name:
if detail:
yield ZipFileStat(info)
else:
yield name
else:
_list = set()
for info in self.zipobj.infolist():
name = info.filename
return_file_name = None
current_dir_list = os.path.normpath(name).split('/')
if not given_dir_list:
# if path_or_prefix is not given
return_file_name = current_dir_list[0]
else:
if (current_dir_list
and len(current_dir_list) > len(given_dir_list)
and current_dir_list[:len(given_dir_list)] ==
given_dir_list):
return_file_name = current_dir_list[
len(given_dir_list):][0]
if (return_file_name is not None
and return_file_name not in _list):
_list.add(return_file_name)
if detail:
yield ZipFileStat(info)
else:
yield return_file_name
[docs]
def isdir(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, file_path)
if self.exists(file_path):
return self.stat(file_path).isdir()
else:
file_path = os.path.normpath(file_path)
# check if directories are NOT included in the zip
if any(name.startswith(file_path + "/")
for name in self._names()):
return True
return False
[docs]
def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None):
raise io.UnsupportedOperation("zip does not support mkdir")
[docs]
def makedirs(self, file_path: str, mode=0o777, exist_ok=False):
raise io.UnsupportedOperation("zip does not support makedirs")
[docs]
def exists(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
namelist = self.zipobj.namelist()
return (file_path in namelist
or file_path + "/" in namelist)
[docs]
def rename(self, *args):
raise io.UnsupportedOperation
[docs]
def remove(self, file_path, recursive=False):
raise io.UnsupportedOperation
def _canonical_name(self, file_path: str) -> str:
canonical_name = self.backend._canonical_name(self.file_path)
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
# Use pfio-zipfs as reserved name to represent PFIO's Zip.
# If someone use `pfio-zipfs` in file_path, this might be broken.
return f"{canonical_name}/pfio-zipfs/{file_path}"
def _names(self) -> Set[str]:
if self.name_cache is not None:
return self.name_cache
else:
return set(
data.filename for data in self.zipobj.infolist()
)
def _open_zip(fs, file_path, mode, **kwargs) -> Zip:
return Zip(fs, file_path, mode, **kwargs)