Source code for zict.lmdb

from __future__ import annotations

import pathlib
import sys
from collections.abc import ItemsView, Iterable, Iterator, ValuesView

from zict.common import ZictBase


def _encode_key(key: str) -> bytes:
    return key.encode("utf-8")


def _decode_key(key: bytes) -> str:
    return key.decode("utf-8")


[docs] class LMDB(ZictBase[str, bytes]): """Mutable Mapping interface to a LMDB database. Keys must be strings, values must be bytes Parameters ---------- directory: str map_size: int On Linux and MacOS, maximum size of the database file on disk. Defaults to 1 TiB on 64 bit systems and 1 GiB on 32 bit ones. On Windows, preallocated total size of the database file on disk. Defaults to 10 MiB to encourage explicitly setting it. Notes ----- None of this class is thread-safe - not even normally trivial methods such as ``__len__ `` or ``__contains__``. Examples -------- >>> z = LMDB('/tmp/somedir/') # doctest: +SKIP >>> z['x'] = b'123' # doctest: +SKIP >>> z['x'] # doctest: +SKIP b'123' """ def __init__(self, directory: str | pathlib.Path, map_size: int | None = None): import lmdb super().__init__() if map_size is None: if sys.platform != "win32": map_size = min(2**40, sys.maxsize // 4) else: map_size = 10 * 2**20 self.db = lmdb.open( str(directory), subdir=True, map_size=map_size, sync=False, writemap=True, ) def __getitem__(self, key: str) -> bytes: if not isinstance(key, str): raise KeyError(key) with self.db.begin() as txn: value = txn.get(_encode_key(key)) if value is None: raise KeyError(key) return value def __setitem__(self, key: str, value: bytes) -> None: if not isinstance(key, str): raise TypeError(key) if not isinstance(value, bytes): raise TypeError(value) with self.db.begin(write=True) as txn: txn.put(_encode_key(key), value) def __contains__(self, key: object) -> bool: if not isinstance(key, str): return False with self.db.begin() as txn: return txn.cursor().set_key(_encode_key(key)) def __iter__(self) -> Iterator[str]: cursor = self.db.begin().cursor() return (_decode_key(k) for k in cursor.iternext(keys=True, values=False))
[docs] def items(self) -> ItemsView[str, bytes]: return LMDBItemsView(self)
[docs] def values(self) -> ValuesView[bytes]: return LMDBValuesView(self)
def _do_update(self, items: Iterable[tuple[str, bytes]]) -> None: # Optimized version of update() using a single putmulti() call. items_enc = [] for key, value in items: if not isinstance(key, str): raise TypeError(key) if not isinstance(value, bytes): raise TypeError(value) items_enc.append((_encode_key(key), value)) with self.db.begin(write=True) as txn: consumed, added = txn.cursor().putmulti(items_enc) assert consumed == added == len(items_enc) def __delitem__(self, key: str) -> None: if not isinstance(key, str): raise KeyError(key) with self.db.begin(write=True) as txn: if not txn.delete(_encode_key(key)): raise KeyError(key) def __len__(self) -> int: return self.db.stat()["entries"]
[docs] def close(self) -> None: self.db.close()
class LMDBItemsView(ItemsView[str, bytes]): _mapping: LMDB # FIXME CPython implementation detail __slots__ = () def __contains__(self, item: object) -> bool: key: str value: object key, value = item # type: ignore try: v = self._mapping[key] except KeyError: return False else: return v == value def __iter__(self) -> Iterator[tuple[str, bytes]]: cursor = self._mapping.db.begin().cursor() return ((_decode_key(k), v) for k, v in cursor.iternext(keys=True, values=True)) class LMDBValuesView(ValuesView[bytes]): _mapping: LMDB # FIXME CPython implementation detail __slots__ = () def __contains__(self, value: object) -> bool: return any(value == v for v in self) def __iter__(self) -> Iterator[bytes]: cursor = self._mapping.db.begin().cursor() return cursor.iternext(keys=False, values=True)