Source code for pfio.v2.zip

import io
import logging
import os
import zipfile
from datetime import datetime
from typing import Optional, Set

from .fs import FS, FileStat, format_repr

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())


class ZipFileStat(FileStat):
    """Detailed information of a file in a Zip

    Attributes:
        filename (str): Derived from `~FileStat`.
        orig_filename (str): ``ZipFile.orig_filename``.
        comment (str): ``ZipFile.comment``.
        last_modifled (float): Derived from `~FileStat`.
            No sub-second precision.
        mode (int): Derived from `~FileStat`.
        size (int): Derived from `~FileStat`.
        create_system (int): ``ZipFile.create_system``.
        create_version (int): ``ZipFile.create_version``.
        extract_version (int): ``ZipFile.extract_version``.
        flag_bits (int): ``ZipFile.flag_bits``.
        volume (int): ``ZipFile.volume``.
        internal_attr (int): ``ZipFile.internal_attr``.
        external_attr (int): ``ZipFile.external_attr``.
        header_offset (int): ``ZipFile.header_offset``.
        compress_size (int): ``ZipFile.compress_size``.
        compress_type (int): ``ZipFile.compress_type``.
        CRC (int): ``ZipFile.CRC``.
    """

    def __init__(self, zip_info):
        self.last_modified = float(datetime(*zip_info.date_time).timestamp())
        # https://github.com/python/cpython/blob/3.8/Lib/zipfile.py#L392
        self.mode = zip_info.external_attr >> 16
        self.size = zip_info.file_size

        for k in ('filename', 'orig_filename', 'comment', 'create_system',
                  'create_version', 'extract_version', 'flag_bits',
                  'volume', 'internal_attr', 'external_attr', 'CRC',
                  'header_offset', 'compress_size', 'compress_type'):
            setattr(self, k, getattr(zip_info, k))


[docs] class Zip(FS): _readonly = True def __init__(self, backend, file_path, mode='r', create=False, local_cache=False, local_cachedir=None, **kwargs): super().__init__() self.backend = backend self.file_path = file_path self.mode = mode self.kwargs = kwargs if create: raise ValueError("create option is not supported") if 'r' in mode and 'w' in mode: raise io.UnsupportedOperation('Read-write mode is not supported') if 'w' in mode: self._readonly = False if local_cache or local_cachedir: raise NotImplementedError("Sparse file cache has been removed.") self._reset() def _reset(self): obj = self.backend.open(self.file_path, self.mode + 'b', **self.kwargs) self.fileobj = obj assert self.fileobj is not None self.zipobj = zipfile.ZipFile(self.fileobj, self.mode) self.name_cache: Optional[Set[str]] = None if self._readonly: self.name_cache = self._names() def __getstate__(self): state = self.__dict__.copy() state['fileobj'] = None state['zipobj'] = None state['name_cache'] = None return state def __setstate__(self, state): self.__dict__ = state def __repr__(self): return format_repr( Zip, { "file_path": self.file_path, "mode": self.mode, "backend": self.backend, }, ) def open(self, file_path, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None): self._checkfork() file_path = os.path.join(self.cwd, os.path.normpath(file_path)) fp = self.zipobj.open(file_path, mode.replace('b', '')) if 'b' not in mode: fp = io.TextIOWrapper(fp, encoding, errors, newline) return fp
[docs] def subfs(self, path): # TODO raise NotImplementedError()
def close(self): self._checkfork() self.zipobj.close() self.fileobj.close()
[docs] def stat(self, path): self._checkfork() names = self._names() path = os.path.join(self.cwd, os.path.normpath(path)) if path in names: actual_path = path elif not path.endswith('/') and path + '/' in names: # handles cases when path is a directory but without trailing slash # see issue $67 actual_path = path + '/' else: raise FileNotFoundError( "{} is not found".format(path)) return ZipFileStat(self.zipobj.getinfo(actual_path))
[docs] def list(self, path_or_prefix: Optional[str] = "", recursive=False, detail=False): self._checkfork() if path_or_prefix: path_or_prefix = os.path.join(self.cwd, os.path.normpath(path_or_prefix)) # cannot move beyond root given_dir_list = path_or_prefix.split('/') if ("." in given_dir_list or ".." in given_dir_list or {""} == set(given_dir_list)): given_dir_list = [] path_or_prefix = "" else: given_dir_list = [] if path_or_prefix: if self.exists(path_or_prefix) and not self.isdir(path_or_prefix): raise NotADirectoryError( "{} is not a directory".format(path_or_prefix)) elif not any(name.startswith(path_or_prefix + "/") for name in self._names()): # check if directories are NOT included in the zip # such kind of zip can be made with "zip -D" raise FileNotFoundError( "{} is not found".format(path_or_prefix)) if recursive: for info in self.zipobj.infolist(): name = info.filename assert path_or_prefix is not None if name.startswith(path_or_prefix): name = name[len(path_or_prefix):].strip("/") if name: if detail: yield ZipFileStat(info) else: yield name else: _list = set() for info in self.zipobj.infolist(): name = info.filename return_file_name = None current_dir_list = os.path.normpath(name).split('/') if not given_dir_list: # if path_or_prefix is not given return_file_name = current_dir_list[0] else: if (current_dir_list and len(current_dir_list) > len(given_dir_list) and current_dir_list[:len(given_dir_list)] == given_dir_list): return_file_name = current_dir_list[ len(given_dir_list):][0] if (return_file_name is not None and return_file_name not in _list): _list.add(return_file_name) if detail: yield ZipFileStat(info) else: yield return_file_name
[docs] def isdir(self, file_path: str): self._checkfork() file_path = os.path.join(self.cwd, file_path) if self.exists(file_path): return self.stat(file_path).isdir() else: file_path = os.path.normpath(file_path) # check if directories are NOT included in the zip if any(name.startswith(file_path + "/") for name in self._names()): return True return False
[docs] def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None): raise io.UnsupportedOperation("zip does not support mkdir")
[docs] def makedirs(self, file_path: str, mode=0o777, exist_ok=False): raise io.UnsupportedOperation("zip does not support makedirs")
[docs] def exists(self, file_path: str): self._checkfork() file_path = os.path.join(self.cwd, os.path.normpath(file_path)) namelist = self.zipobj.namelist() return (file_path in namelist or file_path + "/" in namelist)
[docs] def rename(self, *args): raise io.UnsupportedOperation
[docs] def remove(self, file_path, recursive=False): raise io.UnsupportedOperation
def _canonical_name(self, file_path: str) -> str: canonical_name = self.backend._canonical_name(self.file_path) file_path = os.path.join(self.cwd, os.path.normpath(file_path)) # Use pfio-zipfs as reserved name to represent PFIO's Zip. # If someone use `pfio-zipfs` in file_path, this might be broken. return f"{canonical_name}/pfio-zipfs/{file_path}" def _names(self) -> Set[str]: if self.name_cache is not None: return self.name_cache else: return set( data.filename for data in self.zipobj.infolist() )
def _open_zip(fs, file_path, mode, **kwargs) -> Zip: return Zip(fs, file_path, mode, **kwargs)