Source code for zict.file

from __future__ import annotations

import mmap
import os
import pathlib
from collections.abc import Iterator
from urllib.parse import quote, unquote

from zict.common import ZictBase, locked


[docs] class File(ZictBase[str, bytes]): """Mutable Mapping interface to a directory Keys must be strings, values must be buffers Note this shouldn't be used for interprocess persistence, as keys are cached in memory. Parameters ---------- directory: str Directory to write to. If it already exists, existing files will be imported as mapping elements. If it doesn't exists, it will be created. memmap: bool (optional) If True, use `mmap` for reading. Defaults to False. Notes ----- If you call methods of this class from multiple threads, access will be fast as long as atomic disk access such as ``open``, ``os.fstat``, and ``os.remove`` is fast. This is not always the case, e.g. in case of slow network mounts or spun-down magnetic drives. Bytes read/write in the files is not protected by locks; this could cause failures on Windows, NFS, and in general whenever it's not OK to delete a file while there are file descriptors open on it. Examples -------- >>> z = File('myfile') # doctest: +SKIP >>> z['x'] = b'123' # doctest: +SKIP >>> z['x'] # doctest: +SKIP b'123' Also supports writing lists of bytes objects >>> z['y'] = [b'123', b'4567'] # doctest: +SKIP >>> z['y'] # doctest: +SKIP b'1234567' Or anything that can be used with file.write, like a memoryview >>> z['data'] = np.ones(5).data # doctest: +SKIP """ directory: str memmap: bool filenames: dict[str, str] _inc: int def __init__(self, directory: str | pathlib.Path, memmap: bool = False): super().__init__() self.directory = str(directory) self.memmap = memmap self.filenames = {} self._inc = 0 if not os.path.exists(self.directory): os.makedirs(self.directory, exist_ok=True) else: for fn in os.listdir(self.directory): self.filenames[self._unsafe_key(fn)] = fn self._inc += 1 def _safe_key(self, key: str) -> str: """Escape key so that it is usable on all filesystems. Append to the filenames a unique suffix that changes every time this method is called. This prevents race conditions when another thread accesses the same key, e.g. ``__setitem__`` on one thread and ``__getitem__`` on another. """ # `#` is escaped by quote and is supported by most file systems key = quote(key, safe="") + f"#{self._inc}" self._inc += 1 return key @staticmethod def _unsafe_key(key: str) -> str: """Undo the escaping done by _safe_key()""" key = key.split("#")[0] return unquote(key) def __str__(self) -> str: return f"<File: {self.directory}, {len(self)} elements>" __repr__ = __str__ @locked def __getitem__(self, key: str) -> bytearray | memoryview: fn = os.path.join(self.directory, self.filenames[key]) # distributed.protocol.numpy.deserialize_numpy_ndarray makes sure that, if the # numpy array was writeable before serialization, remains writeable afterwards. # If it receives a read-only buffer (e.g. from fh.read() or from a mmap to a # read-only file descriptor), it performs an expensive memcpy. # Note that this is a dask-specific feature; vanilla pickle.loads will instead # return an array with flags.writeable=False. if self.memmap: with open(fn, "r+b") as fh: return memoryview(mmap.mmap(fh.fileno(), 0)) else: with open(fn, "rb") as fh: size = os.fstat(fh.fileno()).st_size buf = bytearray(size) with self.unlock(): nread = fh.readinto(buf) assert nread == size return buf @locked def __setitem__( self, key: str, value: bytes | bytearray | memoryview | list[bytes | bytearray | memoryview] | tuple[bytes | bytearray | memoryview, ...], ) -> None: self.discard(key) fn = self._safe_key(key) with open(os.path.join(self.directory, fn), "wb") as fh, self.unlock(): if isinstance(value, (tuple, list)): fh.writelines(value) else: fh.write(value) if key in self.filenames: # Race condition: two calls to __setitem__ from different threads on the # same key at the same time os.remove(os.path.join(self.directory, fn)) else: self.filenames[key] = fn def __contains__(self, key: object) -> bool: return key in self.filenames def __iter__(self) -> Iterator[str]: return iter(self.filenames) @locked def __delitem__(self, key: str) -> None: fn = self.filenames.pop(key) os.remove(os.path.join(self.directory, fn)) def __len__(self) -> int: return len(self.filenames)