Source code for pfio.cache.multiprocess_file_cache

import errno
import fcntl
import numbers
import os
import pickle
import tempfile
import warnings
from struct import calcsize, pack, unpack

from pfio import cache
from pfio.cache.file_cache import _check_local, _default_cache_path

class _NoOpenNamedTemporaryFile(object):
    """Temporary file class

    This class warps mkstemp and implements auto-clean mechanism.
    The reason why we cannot use the tempfile.NamedTemporaryFile is that
    it has an unpicklable member because it opens the created temporary file,
    which makes it impossible to pass over to worker processes.

    The auto cleanup mechanism is based on CPython tempfile implementation.

    # Set here since __del__ checks it
    name = None
    master_pid = None

    def __init__(self, dir, master_pid):
        _, = tempfile.mkstemp(dir=dir)
        self.master_pid = master_pid

    def close(self, unlink=os.unlink, getpid=os.getpid):
        if and self.master_pid == getpid():
   = None

    def __del__(self):

class _DummyTemporaryFile(object):
    """Dummy tempfile class that imitates the _NoOpenNamedTemporaryFile

    This class is used for MultiprocessFileCache.preload.
    The cache file fed from outside shouldn't be automatically deleted
    by close(), so it uses this dummy cache class.

    def __init__(self, name): = name

    def close(self):

[docs] class MultiprocessFileCache(cache.Cache): '''The Multiprocess-safe cache system on a local filesystem Stores cache data in a local temporary file, created in ``~/.pfio/cache`` by default. It automatically deletes the cache data after the object is collected. When this object is not correctly closed (e.g., the process killed by SIGKILL), the cache remains after the process's death. This class supports handling a cache from multiple processes. A MultiprocessFileCache object can be handed over to another process through the pickle. Calling ``get`` and ``put`` in each process will look into the same cache file with flock-based locking. The temporary cache file will persist as long as the MultiprocessFileCache object is alive in the original process that creates it. Therefore, even after destroying the worker processes, the MultiprocessFileCache object can still be passed to another process. .. admonition:: Example Using MultiprocessFileCache is similar to the :class:`~NaiveCache` and :class:`~FileCache`. :: from pfio.cache import MultiprocessFileCache class MyDataset( def __init__(self, image_paths): self.paths = image_paths self.cache = MultiprocessFileCache(len(image_paths), do_pickle=True) ... When iterating over the dataset, it is common to load the data concurrently to hide file IO bottleneck by setting higher ``num_workers`` in PyTorch DataLoader. :: image_paths = open('/path/to/image_list.txt').read().splitlines() dataset = MyDataset(image_paths) loader = DataLoader(dataset, batch_size=64, num_workers=8) # Parallel data loading for epoch in range(10): for batch in loader: ... In this case, the dataset is distributed to each worker process i.e., ``__getitem__`` of the dataset will be called in a different process that initialized it. The ``MultiprocessFileCache`` object held by the dataset in each worker looks at the same cache file and handles the concurrent access based on the ``flock`` system call. Therefore the data inserted to the cache by a worker process can be accessed from another worker process safely. In case your task does not require concurrent data loading, i.e., ``num_workers=0`` in DataLoader, consider using :class:`~FileCache` as it has less overhead for concurrency control. The persisted cache file created by ``preserve()`` can be used for :meth:`FileCache.preload` and vice versa. Arguments: length (int): Length of the cache array. do_pickle (bool): Do automatic pickle and unpickle inside the cache. dir (str): The path to the directory to place cache data in case home directory is not backed by fast storage device. Must not be an NFS. cache_size_limit (None or int): Limitation of the cache size in bytes. If the total amount of cached data reaches the limit, the cache will become frozen and no longer acccept further addition. Data already stored in the cache can be accessed normally. None (default) and 0 is unlimited. verbose (bool): Print detailed logs of the cache. ''' # NOQA def __init__(self, length, do_pickle=False, dir=None, cache_size_limit=None, verbose=False): self.length = length self.do_pickle = do_pickle self.verbose = verbose if self.length <= 0 or (2 ** 64) <= self.length: raise ValueError("length has to be between 0 and 2^64") if not (cache_size_limit is None or (isinstance(cache_size_limit, numbers.Number) and 0 <= cache_size_limit)): msg = "cache_size_limit has to be either None, zero " \ "(both indicate unlimited) or larger than 0. " \ "{} is specified.".format(cache_size_limit) raise ValueError(msg) self.cache_size_limit = cache_size_limit if dir is None: self.dir = _default_cache_path() else: self.dir = dir os.makedirs(self.dir, exist_ok=True) _check_local(self.dir) self.closed = False self._frozen = False self._master_pid = os.getpid() self.cache_file = _NoOpenNamedTemporaryFile(self.dir, self._master_pid) cache_fd =, os.O_RDWR) if self.verbose: print('created cache file:', try: fcntl.flock(cache_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Fill up indices part of the cache file by index=0, size=-1 buf = pack('Qq', 0, -1) self.buflen = calcsize('Qq') assert self.buflen == 16 for i in range(self.length): offset = self.buflen * i r = os.pwrite(cache_fd, buf, offset) assert r == self.buflen except OSError as ose: # Lock acquisition error -> No problem, since other worker # should be already working on it if ose.errno not in (errno.EACCES, errno.EAGAIN): raise finally: fcntl.flock(cache_fd, fcntl.LOCK_UN) os.close(cache_fd) # Open lazily at the first call of get or put in each child process self._fd_pid = None self.cache_fd = None def __len__(self): return self.length @property def multiprocess_safe(self) -> bool: return True @property def multithread_safe(self) -> bool: return True def get(self, i): if self.closed: return data = self._get(i) if self.do_pickle and data: data = pickle.loads(data) return data def _open_fds(self): pid = os.getpid() if self._fd_pid != pid: self._fd_pid = pid self.cache_fd =, os.O_RDWR) def _get(self, i): if i < 0 or self.length <= i: raise IndexError("index {} out of range ([0, {}])" .format(i, self.length - 1)) self._open_fds() offset = self.buflen * i fcntl.flock(self.cache_fd, fcntl.LOCK_SH) index_entry = os.pread(self.cache_fd, self.buflen, offset) (o, l) = unpack('Qq', index_entry) if l < 0 or o < 0: fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return None data = os.pread(self.cache_fd, l, o) assert len(data) == l fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return data def put(self, i, data): if self._frozen or self.closed: return False try: if self.do_pickle: data = pickle.dumps(data) return self._put(i, data) except OSError as ose: # Disk full (ENOSPC) possibly by cache; just warn and keep running if ose.errno == errno.ENOSPC: warnings.warn(ose.strerror, RuntimeWarning) return False else: raise ose def _put(self, i, data): if self.closed: return False if i < 0 or self.length <= i: raise IndexError("index {} out of range ([0, {}])" .format(i, self.length - 1)) self._open_fds() index_ofst = self.buflen * i fcntl.flock(self.cache_fd, fcntl.LOCK_EX) buf = os.pread(self.cache_fd, self.buflen, index_ofst) (o, l) = unpack('Qq', buf) if l >= 0 and o >= 0: # Already data exists fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return False data_pos = os.lseek(self.cache_fd, 0, os.SEEK_END) if self.cache_size_limit: if self.cache_size_limit < (data_pos + len(data)): self._frozen = True fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return False index_entry = pack('Qq', data_pos, len(data)) assert os.pwrite(self.cache_fd, index_entry, index_ofst) == self.buflen assert os.pwrite(self.cache_fd, data, data_pos) == len(data) os.fsync(self.cache_fd) fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return True def __enter__(self): return self def __exit__(self, *exc): self.close() def close(self): pid = os.getpid() if pid == self._fd_pid: os.close(self.cache_fd) self._fd_pid = None if not self.closed and pid == self._master_pid: self.cache_file.close() self.closed = True self.cache_file = None self.cache_fd = None
[docs] def preload(self, name): '''Load the cache saved by ``preserve()`` After loading the file, no data can be added to the cache. ``name`` is the name of the persistent file in the cache directory. When it succeeds, it returns ``True``. If there is no cache file with the specified name in the cache directory, it will do nothing but return ``False``. Be noted that ``preload()`` can be called only by the master process i.e., the process where ``__init__()`` is called, in order to prevent inconsistency. When using in a multiprocessing environment, you first need to create a ``MultiprocessFileCache`` object, call its ``preload()`` and then pass it to the worker processes. Returns: bool: Returns True if succeed. .. note:: This feature is experimental. ''' if self._frozen: if self.verbose: print("Failed to preload the cache from {}: " "The cache is already frozen." .format(name)) return False if self._master_pid != os.getpid(): raise RuntimeError("Cannot preload a cache in a worker process") # Overwrite the current cache by the specified cache file. # This is needed to prevent the specified cache file are deleted when # the cache object is destroyed. ld_cache_file = os.path.join(self.dir, name) if not os.path.exists(ld_cache_file): if self.verbose: print('Failed to ploread the cache from {}: ' 'The specified cache not found in {}' .format(name, self.dir)) return False self.cache_file.close() self.cache_fd = None self.cache_file = _DummyTemporaryFile(ld_cache_file) self._frozen = True return True
[docs] def preserve(self, name, overwrite=False): '''Preserve the cache as a persistent file on the disk Once the cache is preserved, the cache file will not be removed at cache close. To read data from preserved file, use ``preload()`` method. After preservation, no data can be added to the cache. ``name`` is the name of the persistent files saved into the cache directory. When it succeeds, it returns ``True``. If there is a cache file with the same name already exists in the cache directory, it will do nothing but return ``False``. Be noted that ``preserve()`` can be called only by the master process i.e., the process where ``__init__()`` is called, in order to prevent inconsistency. The preserved cache can also be preloaded by :class:`~FileCache`. Arguments: name (str): Prefix of the preserved file names. ``(name).cachei`` and ``(name).cached`` are created. The files are created in the same directory as the cache (``dir`` option to ``__init__``). overwrite (bool): Overwrite if already exists Returns: bool: Returns True if succeed. .. note:: This feature is experimental. ''' if self._master_pid != os.getpid(): raise RuntimeError("Cannot preserve a cache in a worker process") cache_file = os.path.join(self.dir, name) if overwrite: if os.path.exists(cache_file): os.unlink(cache_file) elif os.path.exists(cache_file): if self.verbose: print('Specified cache named "{}" already exists in {}' .format(name, self.dir)) return False self._open_fds() try: fcntl.flock(self.cache_fd, fcntl.LOCK_EX), cache_file) except OSError as ose: # Lock acquisition error -> No problem, since other worker # should be already working on it if ose.errno not in (errno.EACCES, errno.EAGAIN): raise finally: fcntl.flock(self.cache_fd, fcntl.LOCK_UN) return True