Source code for pfio.cache.file_cache

import errno
import numbers
import os
import pickle
import subprocess
import tempfile
import threading
import warnings
from struct import calcsize, pack, unpack

from pfio import cache

# Deprecated, but leaving for backward compatibility just in case any
# system directly using this value
_DEFAULT_CACHE_PATH = os.path.join(
    os.getenv('HOME', ""), ".pfio", "cache")

_FORCE_LOCAL = True


def _default_cache_path():
    basedir = os.getenv('XDG_CACHE_HOME')

    if not basedir:
        basedir = os.path.join(os.getenv('HOME'), ".cache")

    cachedir = os.path.join(basedir, "pfio")
    return cachedir


class LockContext:
    def __init__(self, locked_lock):
        self.locked_lock = locked_lock

    def __enter__(self):
        return self

    def __exit__(self, *exc):
        self.locked_lock.unlock()


class RWLock:
    '''Reader-writer lock

    TODO(kuenishi): Add unit tests

    '''

    def __init__(self):
        self.cv = threading.Condition()
        self.writer = None
        self.reader = set()

    def rdlock(self):
        with self.cv:
            self.cv.wait_for(lambda: self.writer is None)
            self.reader.add(threading.get_ident())
            return LockContext(self)

    def wrlock(self):
        with self.cv:
            thread_id = threading.get_ident()
            self.cv.wait_for(lambda: self.writer is None and
                             self.writer != thread_id and
                             len(self.reader) == 0)
            self.writer = thread_id
            return LockContext(self)

    def unlock(self):
        with self.cv:
            thread_id = threading.get_ident()
            if self.writer == thread_id:
                self.writer = None
            else:
                self.reader.remove(thread_id)
            self.cv.notify_all()


class DummyLock:
    '''Dummy class for multithread-unsafe fast cache class
    '''

    def __init__(self):
        pass

    def rdlock(self):
        return LockContext(self)

    def wrlock(self):
        return LockContext(self)

    def unlock(self):
        pass


def _check_local(path: str):
    '''Verifies the path is not NFS

    Raises an error if the ``stat(1)`` answers it as NFS.

    '''
    global _FORCE_LOCAL
    if not _FORCE_LOCAL:
        return

    # From https://unix.stackexchange.com/a/402236
    cp = subprocess.run(['stat', '-f', '-c', '%T', path], check=True,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT)
    if 'nfs' in cp.stdout.decode():
        raise ValueError("NFS can't be used for cache: {}".format(path))


[docs] class FileCache(cache.Cache): '''Cache system with local filesystem Stores cache data in a local temporary file created in ``$XDG_CACHE_HOME/pfio`` by default. If it is unset, ``$HOME/.cache/pfio`` will be the cache destination. Cache data is automatically deleted after the object is collected. When this object is not correctly closed, (e.g., the process killed by SIGTERM), the cache remains after the death of process. .. note:: This feature requires ``stat(1)`` command from GNU coreutils. Arguments: length (int): Length of the cache array. multithread_safe (bool): Defines multithread safety. If this is ``True``, reader-writer locking system based on ``threading.Lock`` is introduced behind the cache management. Major use case is with Chainer's ``MultithreadIterator``. do_pickle (bool): Do automatic pickle and unpickle inside the cache. dir (str): The path to the directory to place cache data in case home directory is not backed by fast storage device. Must not be an NFS. cache_size_limit (None or int): Limitation of the cache size in bytes. If the total amount of cached data reaches the limit, the cache will become frozen and no longer acccept further addition. Data already stored in the cache can be accessed normally. None (default) and 0 is unlimited. verbose (bool): Print detailed logs of the cache. ''' def __init__(self, length, multithread_safe=False, do_pickle=False, dir=None, cache_size_limit=None, verbose=False): self._multithread_safe = multithread_safe self.length = length self.do_pickle = do_pickle if self.length <= 0 or (2 ** 64) <= self.length: raise ValueError("length has to be between 0 and 2^64") if not (cache_size_limit is None or (isinstance(cache_size_limit, numbers.Number) and 0 <= cache_size_limit)): msg = "cache_size_limit has to be either None, zero " \ "(both indicate unlimited) or larger than 0. " \ "{} is specified.".format(cache_size_limit) raise ValueError(msg) self.cache_size_limit = cache_size_limit if self.multithread_safe: self.lock = RWLock() else: self.lock = DummyLock() if dir is None: self.dir = _default_cache_path() else: self.dir = dir os.makedirs(self.dir, exist_ok=True) _check_local(self.dir) self.closed = False self.cachefp = tempfile.NamedTemporaryFile(delete=True, dir=self.dir) # allocate space to store 2n uint64 index buffer filled by -1. # the cache data will be appended after the indices. buf = pack('Qq', 0, -1) self.buflen = calcsize('Qq') assert self.buflen == 16 for i in range(self.length): offset = self.buflen * i r = os.pwrite(self.cachefp.fileno(), buf, offset) assert r == self.buflen self.pos = self.buflen * self.length self.verbose = verbose if self.verbose: print('created cache file:', self.cachefp.name) self._frozen = False def __len__(self): return self.length @property def frozen(self): return self._frozen @property def multiprocess_safe(self): # If it's preseved/preloaded, then the file contents are fixed. return self._frozen @property def multithread_safe(self): return self._multithread_safe def get(self, i): if self.closed: return data = self._get(i) if self.do_pickle and data: data = pickle.loads(data) return data def _get(self, i): if i < 0 or self.length <= i: raise IndexError("index {} out of range ([0, {}])" .format(i, self.length - 1)) offset = self.buflen * i with self.lock.rdlock(): buf = os.pread(self.cachefp.fileno(), self.buflen, offset) (o, l) = unpack('Qq', buf) if l < 0 or o < 0: return None data = os.pread(self.cachefp.fileno(), l, o) assert len(data) == l return data def put(self, i, data): if self._frozen or self.closed: return False try: if self.do_pickle: data = pickle.dumps(data) return self._put(i, data) except OSError as ose: # Disk full (ENOSPC) possibly by cache; just warn and keep running if ose.errno == errno.ENOSPC: warnings.warn(ose.strerror, RuntimeWarning) return False else: raise ose def _put(self, i, data): if self.closed: return if i < 0 or self.length <= i: raise IndexError("index {} out of range ([0, {}])" .format(i, self.length - 1)) if self.cache_size_limit: if self.cache_size_limit < (self.pos + len(data)): self._frozen = True return False offset = self.buflen * i with self.lock.wrlock(): buf = os.pread(self.cachefp.fileno(), self.buflen, offset) (o, l) = unpack('Qq', buf) if l >= 0 and o >= 0: # Already data exists return False pos = self.pos '''Notes on possibility of partial write write(3) says partial writes ret<nbyte may happen in case nbytes>PIPE_BUF. In Linux 5.0 PIPE_BUF is 4096 so partial writes do not happen when writing index, but they may happen when writing data. We hope it is rare, it seems to happen mostly in case of multiple writer processes, disk full and ``EINTR``. CPython does care this case by retrying ``pwrite(2)`` as long as it returns ``-1`` . But returns when the return value is positive. We'd better care that case. ''' buf = pack('Qq', pos, len(data)) r = os.pwrite(self.cachefp.fileno(), buf, offset) assert r == self.buflen current_pos = pos while current_pos - pos < len(data): r = os.pwrite(self.cachefp.fileno(), data[current_pos-pos:], current_pos) assert r > 0 current_pos += r assert current_pos - pos == len(data) self.pos += len(data) return True def __enter__(self): return self def __exit__(self, *exc): self.close() def close(self): with self.lock.wrlock(): if not self.closed: self.closed = True self.cachefp.close() self.cachefp = None
[docs] def preload(self, name): '''Load the cache saved by ``preserve()`` ``cache_path`` is the path to the persistent file. To use cache in ``multiprocessing`` environment, call this method at every forked process, except the process that called ``preserve()``. After the preload, no data can be added to the cache. When it succeeds, it returns ``True``. If there is no cache file with the specified name in the cache directory, it will do nothing but return ``False``. Returns: bool: Returns True if succeed. .. note:: This feature is experimental. ''' if self._frozen: if self.verbose: print("Failed to preload the cache from {}: " "The cache is already frozen." .format(name)) return False cachefile = os.path.join(self.dir, name) if not os.path.exists(cachefile): if self.verbose: print('Failed to ploread the cache from {}: ' 'The specified cache not found in {}' .format(name, self.dir)) return False with self.lock.wrlock(): self.cachefp.close() self.cachefp = open(cachefile, 'rb') self._frozen = True return True
[docs] def preserve(self, name, overwrite=False): '''Preserve the cache as a persistent file on the disk Saves the current cache into ``cache_path``. Once the cache is preserved, the cache file will not be removed at cache close. To read data from the preserved file, use ``preload()`` method. After preservation, no data can be added to the cache. When it succeeds, it returns ``True``. If there is a cache file with the same name already exists in the cache directory, it will do nothing but return ``False``. The preserved cache can also be preloaded by :class:`~MultiprocessFileCache`. Arguments: name (str): Prefix of the preserved file names. ``(name).cachei`` and ``(name).cached`` are created. The files are created in the same directory as the cache (``dir`` option to ``__init__``). overwrite (bool): Overwrite if already exists Returns: bool: Returns True if succeed. .. note:: This feature is experimental. ''' cachefile = os.path.join(self.dir, name) if overwrite: if os.path.exists(cachefile): os.unlink(cachefile) elif os.path.exists(cachefile): if self.verbose: print('Specified cache named "{}" already exists in {}' .format(name, self.dir)) return False with self.lock.wrlock(): # Hard link and save them os.link(self.cachefp.name, cachefile) self.cachefp.close() self.cachefp = open(cachefile, 'rb') self._frozen = True return True