From 1f1753264d71a8e8641b3920193503f8c4dee643 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Wed, 17 Jun 2026 23:31:21 +1000 Subject: [PATCH 1/4] feat: zero-copy mmap read path for large plaintext Arrow on the File backend (#171) The "memory-mapped reads" advertised for uncompressed Arrow never existed: the File backend os.read the whole file onto the heap. This builds the real path. FileBackend.get_buffer() memory-maps a value (POSIX) and returns a borrowed zero-copy view of the payload. get_cached_value(_async) takes this fast path for plaintext Arrow that returns pandas, deserializing straight over the mmap (pa.py_buffer is zero-copy) and closing the handle in a finally. Measured: a 150 MB value reads at ~0.1 MB RSS via mmap vs ~157 MB via os.read. Scope and safety (from the design review): - POSIX only. Windows pins mapped files against rename/unlink, which would break the atomic write-then-rename set() path; Windows keeps the os.read path. - Plaintext + pandas only. Encrypted values can never mmap (AES-GCM decrypt owns its buffer); a pyarrow.Table return aliases the mapping (use-after-free on close). - Security: the fd is opened O_NOFOLLOW and the header validated before mapping; we never use a path-based mmap (it would follow an attacker-swapped symlink). - The mmap is confined to the deserialize call frame and never reaches L1; a belt-and-suspenders guard in L1Cache.put rejects non-bytes so any future regression fails loud instead of pinning a mapped inode for the entry's TTL. - Read-side size ceiling (512 MB) independent of max_value_mb; larger files fall back to os.read. The eager xxHash3 integrity check is kept (it faults into reclaimable page cache, not heap, so the steady-state RSS win survives). No protocol/Rust/TS/SaaS change. Compression stays a write-side concern (Arrow IPC self-describes on read), so no serializer-wiring change was needed. --- .secrets.baseline | 4 +- src/cachekit/backends/base.py | 35 ++++ src/cachekit/backends/file/backend.py | 114 +++++++++++++ src/cachekit/cache_handler.py | 77 ++++++++- src/cachekit/config/settings.py | 5 +- src/cachekit/l1_cache.py | 16 ++ src/cachekit/serializers/arrow_serializer.py | 9 +- tests/performance/test_file_backend_perf.py | 45 ++++++ tests/unit/backends/test_file_backend.py | 72 +++++++++ tests/unit/test_l1_memory_bounds.py | 28 ++++ tests/unit/test_mmap_read_path.py | 159 +++++++++++++++++++ 11 files changed, 555 insertions(+), 9 deletions(-) create mode 100644 tests/unit/test_mmap_read_path.py diff --git a/.secrets.baseline b/.secrets.baseline index 7a9b102..9a50a92 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -213,7 +213,7 @@ "filename": "src/cachekit/cache_handler.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 257 + "line_number": 266 } ], "src/cachekit/config/decorator.py": [ @@ -425,5 +425,5 @@ } ] }, - "generated_at": "2026-06-11T01:10:45Z" + "generated_at": "2026-06-17T13:30:40Z" } diff --git a/src/cachekit/backends/base.py b/src/cachekit/backends/base.py index 6e3d1e8..eec6e81 100644 --- a/src/cachekit/backends/base.py +++ b/src/cachekit/backends/base.py @@ -175,6 +175,41 @@ async def refresh_ttl(self, key: str, ttl: int) -> bool: ... +@runtime_checkable +class BufferHandle(Protocol): + """A borrowed, zero-copy view of a cached value plus the resource backing it. + + Returned by ``BufferReadableBackend.get_buffer``. ``view`` aliases backend-owned memory (e.g. + mmap'd file pages), not a heap copy — so the consumer must finish reading and ``close()`` + before the view is touched again. The view DANGLES after close (touching it can segfault), so + it must never be stored (e.g. in L1) nor returned past the read call frame (#171). + """ + + view: memoryview + """Zero-copy view of the payload (valid only until close()).""" + + def close(self) -> None: + """Release the view and its backing resource. Idempotent.""" + ... + + +@runtime_checkable +class BufferReadableBackend(Protocol): + """Optional protocol for backends that can return a zero-copy buffer instead of materializing + the whole value on the heap. + + Lets large plaintext values (e.g. uncompressed Arrow IPC) be read without copying the payload. + Only the File backend implements this today (mmap, POSIX). Backends that don't implement it + are simply read via ``get`` as usual. + """ + + def get_buffer(self, key: str) -> Optional[BufferHandle]: + """Return a borrowed zero-copy handle for ``key``, or None when the value is not mappable + (missing/expired/too large/non-POSIX) — the caller then falls back to ``get``. The caller + MUST ``close()`` the handle when done reading.""" + ... + + @runtime_checkable class LockableBackend(Protocol): """Optional protocol for backends supporting distributed locking. diff --git a/src/cachekit/backends/file/backend.py b/src/cachekit/backends/file/backend.py index e0b19c5..24aac1c 100644 --- a/src/cachekit/backends/file/backend.py +++ b/src/cachekit/backends/file/backend.py @@ -13,6 +13,7 @@ import errno import hashlib +import mmap import os import platform import struct @@ -47,6 +48,40 @@ # TTL bounds (security: prevent integer overflow) MAX_TTL_SECONDS: int = 10 * 365 * 24 * 60 * 60 # 10 years max +# Read-side mmap ceiling (#171, fork 4): a fixed internal cap independent of max_value_mb so a +# misconfigured huge max_value_mb (or an out-of-band file dropped in cache_dir) can't map an +# unbounded region. Above this, get_buffer() returns None and the caller falls back to os.read. +MMAP_MAX_BYTES: int = 512 * 1024 * 1024 # 512 MB + + +class _MmapHandle: + """Owns a read-only mmap of a cache file plus a memoryview of its payload (past the 14-byte + header). Zero-copy: the view aliases mapped pages, never a heap copy. + + The CALLER must ``close()`` once the consumer is done (after Arrow deserialize has copied the + data out via to_pandas). The view DANGLES after close — touching it segfaults — so the handle + must never escape the deserialize call frame and must never be stored in L1 (#171 blocker C). + """ + + __slots__ = ("_mm", "view") + + def __init__(self, mm: mmap.mmap) -> None: + self._mm = mm + # Slice past the 14-byte header; the slice exports its buffer directly from `mm`, so it is + # the only export to release before mm.close(). + self.view: memoryview = memoryview(mm)[HEADER_SIZE:] + + def close(self) -> None: + """Release the view then the mapping. Idempotent (safe to call more than once).""" + try: + self.view.release() # must release exports before mmap.close(), else BufferError + except (ValueError, BufferError): + pass # already released + try: + self._mm.close() + except (ValueError, BufferError): + pass # already closed + class FileBackend: """File-based backend for local disk caching. @@ -189,6 +224,85 @@ def get(self, key: str) -> bytes | None: key=key, ) from exc + def get_buffer(self, key: str) -> _MmapHandle | None: + """Memory-map a cache value for a zero-copy read of its payload (POSIX only; #171). + + Returns an `_MmapHandle` owning the mmap + a memoryview of the payload (past the 14-byte + header), or `None` when mmap does not apply — the caller then falls back to `get()`: + - non-POSIX platform (Windows pins mapped files against rename/unlink); + - missing / expired / corrupt entry (corrupt + expired are unlinked, mirroring `get`); + - empty payload (nothing to map); + - file larger than ``MMAP_MAX_BYTES``. + + Security: the fd is opened with ``O_NOFOLLOW`` and the header is validated from the fd + BEFORE mapping. We never use ``pa.memory_map(path)`` / a path-based mmap — that re-opens + by path and would follow an attacker-swapped symlink, reintroducing the TOCTOU that + ``O_NOFOLLOW`` closes. The mapping survives the fd close on POSIX. + """ + if os.name != "posix": + return None # mapped files can't be renamed/unlinked on Windows; caller uses get() + + file_path = self._key_to_path(key) + + with self._lock: + try: + fd = os.open(file_path, os.O_RDONLY | os.O_NOFOLLOW) + except FileNotFoundError: + return None + except OSError as exc: + if exc.errno in (errno.ENOENT, errno.ELOOP): + return None # missing, or symlink rejected by O_NOFOLLOW + raise BackendError( + f"Failed to open cache file for mmap: {exc}", + error_type=self._classify_os_error(exc, is_directory=False), + original_exception=exc, + operation="get_buffer", + key=key, + ) from exc + + mm: mmap.mmap | None = None + try: + self._acquire_file_lock(fd, exclusive=False) + try: + st_size = os.fstat(fd).st_size + + # Validate-then-map: never map a file we're about to delete. + if st_size < HEADER_SIZE: + self._safe_unlink(file_path) + return None + header = os.read(fd, HEADER_SIZE) + if header[0:2] != MAGIC or header[2] != FORMAT_VERSION: + self._safe_unlink(file_path) + return None + expiry_timestamp = struct.unpack(">Q", header[6:14])[0] + if expiry_timestamp > 0 and time.time() > expiry_timestamp: + self._safe_unlink(file_path) + return None + + # Empty payload (header only): nothing to map (mmap rejects length 0 anyway). + # Too large: fall back to os.read so we never map an unbounded region. + if st_size <= HEADER_SIZE or st_size > MMAP_MAX_BYTES: + return None + + mm = mmap.mmap(fd, st_size, access=mmap.ACCESS_READ) + handle = _MmapHandle(mm) + mm = None # ownership transferred to the handle; don't close it in finally + return handle + finally: + self._release_file_lock(fd) + except OSError as exc: + raise BackendError( + f"Failed to mmap cache file: {exc}", + error_type=self._classify_os_error(exc, is_directory=False), + original_exception=exc, + operation="get_buffer", + key=key, + ) from exc + finally: + if mm is not None: + mm.close() + os.close(fd) + def set(self, key: str, value: bytes, ttl: int | None = None) -> None: """Store value in file storage with atomic write. diff --git a/src/cachekit/cache_handler.py b/src/cachekit/cache_handler.py index a3a36e1..62a50dd 100644 --- a/src/cachekit/cache_handler.py +++ b/src/cachekit/cache_handler.py @@ -11,7 +11,7 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any, Optional, Protocol, TypeGuard, Union, runtime_checkable -from cachekit.backends.base import BackendError, BaseBackend, TTLInspectableBackend +from cachekit.backends.base import BackendError, BaseBackend, BufferHandle, BufferReadableBackend, TTLInspectableBackend from cachekit.backends.provider import ( BackendProviderInterface, DefaultBackendProvider, @@ -85,6 +85,15 @@ def supports_ttl_inspection(backend: BaseBackend) -> TypeGuard[TTLInspectableBac return hasattr(backend, "get_ttl") and hasattr(backend, "refresh_ttl") +def supports_buffer_read(backend: BaseBackend) -> TypeGuard[BufferReadableBackend]: + """Type guard: backend can return a zero-copy buffer via get_buffer (#171, File/POSIX only). + + Returns: + True if backend implements BufferReadableBackend (used for the mmap Arrow read fast path). + """ + return hasattr(backend, "get_buffer") + + # Import caching for serializer modules # # PERFORMANCE OPTIMIZATION: Dynamic imports are expensive (~100μs per import) @@ -640,7 +649,25 @@ def serialize_data( get_logger().error(f"Serialization failed with {self.serializer_name}: {e}") raise SerializationError(f"Failed to serialize data with {self.serializer_name}: {e}") from e - def deserialize_data(self, data: str | bytes, cache_key: str = "") -> Any: + def supports_mmap_read(self) -> bool: + """True iff reads can use the zero-copy mmap fast path (#171). + + Eligible only for PLAINTEXT Arrow that returns pandas: + - encrypted values can never mmap (AES-GCM decrypt owns its buffer); + - non-Arrow serializers gain nothing (they copy at the Rust/C boundary, rebuild objects); + - the "arrow" return_format yields a table that ALIASES the mapped pages, so closing the + handle would be a use-after-free — pandas (which copies out via to_pandas) only. + + The backend must also support buffer reads (File/POSIX); that is checked separately, so a + True here on a non-File backend simply means get_buffer returns None and we fall back. + """ + return ( + not self.encryption + and self._serializer_string_name == "arrow" + and getattr(self._base_serializer, "return_format", None) == "pandas" + ) + + def deserialize_data(self, data: str | bytes | memoryview, cache_key: str = "") -> Any: """Deserialize data from cache storage with cache_key verification. Args: @@ -845,6 +872,19 @@ def get_cached_value(self, cache_key: str, refresh_ttl: Optional[int] = None) -> if self._cache_handler is None: raise RuntimeError("Cache handler must be set before calling get_cached_value") + # mmap fast path (#171): plaintext Arrow -> pandas on a buffer-readable backend (File, + # POSIX) reads zero-copy. The handle is confined to this frame and closed in `finally`, + # so the mmap never becomes the returned value and never reaches L1 (blocker C). A None + # from get_buffer (ineligible file, or a non-buffer backend) falls through to bytes. + if self.serialization_handler.supports_mmap_read(): + handle = self._cache_handler.get_buffer(cache_key) + if handle is not None: + try: + get_logger().cache_hit(cache_key, "Backend(mmap)") + return (True, self.serialization_handler.deserialize_data(handle.view, cache_key)) + finally: + handle.close() + cached_data = self._cache_handler.get(cache_key, refresh_ttl) if cached_data is not None: get_logger().cache_hit(cache_key, "Backend") @@ -886,6 +926,18 @@ async def get_cached_value_async(self, cache_key: str, refresh_ttl: Optional[int if self._cache_handler is None: raise RuntimeError("Cache handler must be set before calling get_cached_value_async") + # mmap fast path (#171): same as the sync path. mmap setup is a fast local syscall and + # the existing async path already deserializes synchronously here, so this adds no new + # event-loop blocking. The handle is confined to this frame; the mmap never reaches L1. + if self.serialization_handler.supports_mmap_read(): + handle = self._cache_handler.get_buffer(cache_key) + if handle is not None: + try: + get_logger().cache_hit(cache_key, "Backend(mmap)") + return (True, self.serialization_handler.deserialize_data(handle.view, cache_key)) + finally: + handle.close() + cached_data = await self._cache_handler.get_async(cache_key, refresh_ttl) if cached_data is not None: get_logger().cache_hit(cache_key, "Backend") @@ -1100,6 +1152,10 @@ def get(self, key: str, refresh_ttl: Optional[int] = None) -> Optional[bytes]: """Get value from cache with optional TTL refresh.""" ... + def get_buffer(self, key: str) -> Optional[BufferHandle]: + """Return a zero-copy buffer handle if the backend supports it (#171), else None.""" + ... + def set(self, key: str, value: Union[str, bytes], ttl: Optional[int] = None, **metadata) -> bool: """Set value in cache with TTL and optional metadata.""" ... @@ -1266,6 +1322,23 @@ def get(self, key: str, refresh_ttl: Optional[int] = None) -> Optional[bytes]: get_logger().error(f"Unexpected error getting key {key}: {e}") return None + def get_buffer(self, key: str) -> Optional[BufferHandle]: + """Return a zero-copy buffer handle for key if the backend supports it (#171), else None. + + Mirrors get()'s backpressure/timeout wrapping. Returns None when the backend can't map the + value (or on any backend error) so the caller transparently falls back to get(). + """ + if not supports_buffer_read(self.backend): + return None + try: + return self._with_backpressure_and_timeout(self.backend.get_buffer, key) + except BackendError as e: + get_logger().error(f"Backend error mmapping key {key}: {e}") + return None + except Exception as e: + get_logger().error(f"Unexpected error mmapping key {key}: {e}") + return None + def set(self, key: str, value: Union[str, bytes], ttl: Optional[int] = None, **metadata) -> bool: """Set value in cache using backend. diff --git a/src/cachekit/config/settings.py b/src/cachekit/config/settings.py index f26aac8..00b5e19 100644 --- a/src/cachekit/config/settings.py +++ b/src/cachekit/config/settings.py @@ -121,8 +121,9 @@ class CachekitConfig(BaseSettings): description=( "Arrow IPC compression codec for DataFrame caching (ArrowSerializer, compression='auto'). " "'zstd'/'lz4' shrink the stored payload but must be decompressed into the heap on read. " - "'none' stores uncompressed Arrow IPC, which enables zero-copy memory-mapped reads " - "(lowest read memory) at the cost of a larger payload. Env: CACHEKIT_ARROW_COMPRESSION." + "'none' stores uncompressed Arrow IPC, which lets the File backend serve plaintext " + "DataFrame reads via a zero-copy mmap (low steady-state read RSS; peak transiently " + "higher) at the cost of a larger payload. Env: CACHEKIT_ARROW_COMPRESSION." ), ) retry_on_timeout: bool = Field( diff --git a/src/cachekit/l1_cache.py b/src/cachekit/l1_cache.py index e463f96..2ad3a2c 100644 --- a/src/cachekit/l1_cache.py +++ b/src/cachekit/l1_cache.py @@ -266,7 +266,23 @@ def put( redis_ttl: TTL in seconds from Redis (used to calculate expiry) expires_at: Absolute expiry timestamp (overrides redis_ttl) namespace: Optional namespace for invalidation support + + Raises: + TypeError: if `value` is not exactly `bytes`. L1 stores raw bytes only; a memoryview + (e.g. an mmap-backed view from the File backend) or a mutable bytearray must never + be stored — the former would pin a mapped file's inode for the whole TTL, the + latter could mutate underneath the cache (#171 blocker C). Loud-fail a regression + rather than silently alias. """ + # Runtime guard: the annotation says bytes, but callers reach here across dynamic + # boundaries (backend.get returns, decorator paths) where the type isn't enforced. + if not isinstance(value, bytes): # pyright: ignore[reportUnnecessaryIsInstance] + raise TypeError( + f"L1Cache.put requires bytes, got {type(value).__name__}. " + "Storing a memoryview/bytearray in L1 is forbidden: an mmap-backed view would pin " + "the mapped file for the entry's TTL. Materialize to bytes before caching." + ) + # Calculate expiry time current_time = time.time() if expires_at is not None: diff --git a/src/cachekit/serializers/arrow_serializer.py b/src/cachekit/serializers/arrow_serializer.py index 8233831..61e89d1 100644 --- a/src/cachekit/serializers/arrow_serializer.py +++ b/src/cachekit/serializers/arrow_serializer.py @@ -142,8 +142,10 @@ def __init__(self, return_format: str = "pandas", enable_integrity_checking: boo compression: Arrow IPC compression codec. - "auto" (default): use the CACHEKIT_ARROW_COMPRESSION setting (itself "zstd" by default) - "zstd" / "lz4": compress the payload (smaller wire/L1; must be decompressed on read) - - None or "none": store uncompressed Arrow IPC, enabling zero-copy memory-mapped reads - (lowest read memory) at the cost of a larger payload + - None or "none": store uncompressed Arrow IPC. Lets the File backend serve plaintext + DataFrame reads (returned as pandas) via a zero-copy mmap — low steady-state read + RSS (~0.32x), though peak is transiently higher from checksum verification + pandas + materialization — at the cost of a larger stored payload. No effect on wire backends. Raises: ValueError: If return_format or compression is not a valid option @@ -226,7 +228,8 @@ def serialize(self, obj: Any) -> tuple[bytes, SerializationMetadata]: # type: i # writing in bounded batches keeps the compressor's working set bounded (one big # batch makes the codec allocate a full-size working buffer — measured ~3.6x the # payload). Size each batch to ~8 MiB regardless of schema width. compression=None - # writes uncompressed IPC, which a reader can memory-map zero-copy. + # writes uncompressed IPC, which the File backend reads zero-copy via mmap (#171, + # plaintext, pandas return only). max_chunksize = _bounded_chunksize(table) sink = pa.BufferOutputStream() write_options = pa.ipc.IpcWriteOptions(compression=self.compression) if self.compression else None diff --git a/tests/performance/test_file_backend_perf.py b/tests/performance/test_file_backend_perf.py index 56566f8..fa015ab 100644 --- a/tests/performance/test_file_backend_perf.py +++ b/tests/performance/test_file_backend_perf.py @@ -15,6 +15,7 @@ from __future__ import annotations +import os import statistics import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -519,3 +520,47 @@ def _print_stats(indent: str, stats: dict[str, float]) -> None: print(f"{indent}P95: {stats['p95_us']:>10.2f} μs") print(f"{indent}P99: {stats['p99_us']:>10.2f} μs") print(f"{indent}StdDev: {stats['stdev_us']:>10.2f} μs") + + +@pytest.mark.performance +@pytest.mark.slow +@pytest.mark.skipif(os.name != "posix", reason="mmap read path is POSIX-only (#171)") +def test_mmap_read_rss_far_below_full_read(tmp_path: Path) -> None: + """The whole point of #171: get_buffer() must NOT materialize the payload on the heap. + + Mapping a large value and touching a few pages should cost a fraction of the RSS that + os.read of the same value does. A relative comparison (mmap vs full read of the SAME payload + in the SAME process) is far more robust to allocator/GC noise than an absolute RSS bound. + """ + import gc + + import psutil + + proc = psutil.Process() + size = 150 * 1024 * 1024 # 150 MB + backend = FileBackend(FileBackendConfig(cache_dir=tmp_path / "c", max_size_mb=1024, max_value_mb=512)) + backend.set("big", b"\x00" * size, ttl=300) + gc.collect() + + # Full read: os.read pulls the whole payload onto the heap. + base = proc.memory_info().rss + data = backend.get("big") + assert data is not None and len(data) == size + full_read_rss = proc.memory_info().rss - base + del data + gc.collect() + + # mmap read: only the few pages we touch are faulted in. + base = proc.memory_info().rss + handle = backend.get_buffer("big") + assert handle is not None + try: + _ = handle.view[0] + _ = handle.view[len(handle.view) // 2] + _ = handle.view[-1] + mmap_rss = proc.memory_info().rss - base + finally: + handle.close() + + print(f"\n full-read RSS: {full_read_rss / 1e6:.1f} MB mmap RSS: {mmap_rss / 1e6:.1f} MB") + assert mmap_rss < full_read_rss * 0.5, f"mmap RSS {mmap_rss} not < 0.5x full-read RSS {full_read_rss}" diff --git a/tests/unit/backends/test_file_backend.py b/tests/unit/backends/test_file_backend.py index 98ccc67..3c2f0cd 100644 --- a/tests/unit/backends/test_file_backend.py +++ b/tests/unit/backends/test_file_backend.py @@ -1695,3 +1695,75 @@ def test_fd_closed_on_lock_timeout(self, backend: FileBackend, config: FileBacke # Verify we can still open files (no exhaustion) backend.set("final_key", b"final_value") assert backend.get("final_key") == b"final_value" + + +@pytest.mark.unit +class TestMmapBuffer: + """get_buffer(): POSIX zero-copy mmap read of the payload past the 14-byte header. + + Backs the Arrow plaintext-uncompressed read fast path (#171). Returns a handle owning the + mmap + a memoryview slice; the caller must close() it. None => caller falls back to get(). + """ + + def test_get_buffer_roundtrips_payload(self, backend: FileBackend) -> None: + backend.set("k", b"arrow-payload-bytes", ttl=60) + handle = backend.get_buffer("k") + assert handle is not None + try: + assert isinstance(handle.view, memoryview) + assert bytes(handle.view) == b"arrow-payload-bytes" + finally: + handle.close() + + def test_get_buffer_view_is_mmap_backed_not_a_heap_copy(self, backend: FileBackend) -> None: + """The whole point: the view must alias an mmap, not a materialized heap copy.""" + import mmap as _mmap + + backend.set("k", b"z" * 9000, ttl=60) + handle = backend.get_buffer("k") + assert handle is not None + try: + assert isinstance(handle.view.obj, _mmap.mmap) + finally: + handle.close() + + def test_get_buffer_missing_key_returns_none(self, backend: FileBackend) -> None: + assert backend.get_buffer("nonexistent") is None + + def test_get_buffer_oversized_returns_none_for_fallback(self, backend: FileBackend, monkeypatch: pytest.MonkeyPatch) -> None: + """Files larger than the read ceiling fall back to os.read (None), never mapped unbounded.""" + backend.set("big", b"y" * 5000, ttl=60) + monkeypatch.setattr("cachekit.backends.file.backend.MMAP_MAX_BYTES", 1000) + assert backend.get_buffer("big") is None + + def test_get_buffer_expired_returns_none(self, backend: FileBackend) -> None: + with time_machine.travel(0, tick=False) as traveller: + backend.set("k", b"data", ttl=60) + traveller.move_to(120, tick=False) # past the 60s TTL + assert backend.get_buffer("k") is None + + def test_close_is_idempotent_and_releases_the_view(self, backend: FileBackend) -> None: + backend.set("k", b"data", ttl=60) + handle = backend.get_buffer("k") + assert handle is not None + handle.close() + handle.close() # idempotent — no error + with pytest.raises((ValueError, BufferError)): + bytes(handle.view) # mapping released; the view is dead + + @pytest.mark.skipif(os.name != "posix", reason="rename-replace inode pinning is POSIX-only") + def test_open_mmap_unaffected_by_concurrent_set(self, backend: FileBackend) -> None: + """The load-bearing invariant: a set() that renames over the key while a reader holds the + mmap must NOT change what the reader sees. POSIX keeps the old (now-unlinked) inode alive + as long as the mapping exists, so the reader gets a stable snapshot. This is why set() must + stay rename-replace-only — any in-place write would corrupt live mmap readers.""" + backend.set("k", b"OLD-PAYLOAD", ttl=300) + handle = backend.get_buffer("k") + assert handle is not None + try: + assert bytes(handle.view) == b"OLD-PAYLOAD" + backend.set("k", b"NEW-PAYLOAD-IS-LONGER", ttl=300) # atomic rename over the same key + assert bytes(handle.view) == b"OLD-PAYLOAD" # mapping still sees the old snapshot + finally: + handle.close() + assert backend.get("k") == b"NEW-PAYLOAD-IS-LONGER" # a fresh read sees the new value diff --git a/tests/unit/test_l1_memory_bounds.py b/tests/unit/test_l1_memory_bounds.py index 563190c..c50e885 100644 --- a/tests/unit/test_l1_memory_bounds.py +++ b/tests/unit/test_l1_memory_bounds.py @@ -81,3 +81,31 @@ def test_non_finite_expires_at_not_stored(self, bad_ttl): cache.put("k", b"value", expires_at=bad_ttl) assert cache.get("k")[0] is False assert cache._current_memory_bytes == 0 + + +@pytest.mark.unit +class TestNonBytesRejection: + """#171 blocker C belt-and-suspenders: L1 stores raw bytes ONLY. + + An mmap-backed memoryview must never reach L1 — it would pin the mapped file's inode for the + whole L1 TTL (silent staleness on POSIX, write failures on Windows, RSS blowup under hot keys). + The mmap read path confines the view to the deserialize frame, but a future refactor could + regress; this guard makes that regression a loud TypeError instead of a silent alias. bytearray + is rejected too (a mutable buffer could change underneath the cache). + """ + + def test_put_rejects_memoryview(self): + cache = L1Cache(max_memory_mb=10) + with pytest.raises(TypeError): + cache.put("k", memoryview(b"data"), redis_ttl=300) # type: ignore[arg-type] + assert cache.get("k")[0] is False + + def test_put_rejects_bytearray(self): + cache = L1Cache(max_memory_mb=10) + with pytest.raises(TypeError): + cache.put("k", bytearray(b"data"), redis_ttl=300) # type: ignore[arg-type] + + def test_put_accepts_bytes(self): + cache = L1Cache(max_memory_mb=10) + cache.put("k", b"data", redis_ttl=300) + assert cache.get("k")[0] is True diff --git a/tests/unit/test_mmap_read_path.py b/tests/unit/test_mmap_read_path.py new file mode 100644 index 0000000..3991eca --- /dev/null +++ b/tests/unit/test_mmap_read_path.py @@ -0,0 +1,159 @@ +"""#171 mmap read-path wiring. + +Three units compose the zero-copy Arrow read fast path: +- ``CacheSerializationHandler.supports_mmap_read()`` — eligibility (plaintext Arrow -> pandas). +- ``StandardCacheHandler.get_buffer()`` — delegates to a backend that exposes ``get_buffer``. +- ``CacheOperationHandler.get_cached_value(_async)`` — when eligible, reads via the mmap handle, + deserializes over the view, and closes the handle in a ``finally``. Crucially the mmap NEVER + becomes the value the decorator holds (it returns the deserialized object), so it can't reach + ``_l1_cache.put`` (blocker C). +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from cachekit.cache_handler import ( + CacheOperationHandler, + CacheSerializationHandler, + StandardCacheHandler, +) + + +@pytest.mark.unit +class TestSupportsMmapRead: + def test_arrow_pandas_plaintext_is_eligible(self) -> None: + sh = CacheSerializationHandler(serializer_name="arrow") + assert sh.supports_mmap_read() is True + + def test_default_serializer_not_eligible(self) -> None: + sh = CacheSerializationHandler(serializer_name="default") + assert sh.supports_mmap_read() is False + + def test_encrypted_arrow_not_eligible(self) -> None: + """Encrypted values can never mmap — AES-GCM decrypt owns its buffer.""" + sh = CacheSerializationHandler(serializer_name="arrow") + sh.encryption = True + assert sh.supports_mmap_read() is False + + def test_arrow_return_format_not_eligible(self) -> None: + """A pyarrow.Table aliases the mmap; closing the handle would be a use-after-free. Pandas only.""" + from cachekit.serializers.arrow_serializer import ArrowSerializer + + sh = CacheSerializationHandler(serializer_name="arrow") + serializer = sh._base_serializer + assert isinstance(serializer, ArrowSerializer) # narrows type so .return_format is valid + # get_serializer caches one ArrowSerializer process-wide; restore after mutating so later + # tests don't get a serializer that returns a Table instead of a DataFrame. + original = serializer.return_format + serializer.return_format = "arrow" + try: + assert sh.supports_mmap_read() is False + finally: + serializer.return_format = original + + +@pytest.mark.unit +class TestStandardCacheHandlerGetBuffer: + def test_delegates_to_backend_when_supported(self) -> None: + backend = MagicMock() + handle = object() + backend.get_buffer.return_value = handle + ch = StandardCacheHandler(backend) + assert ch.get_buffer("k") is handle + backend.get_buffer.assert_called_once_with("k") + + def test_returns_none_when_backend_lacks_get_buffer(self) -> None: + class NoBufferBackend: + def get(self, key, refresh_ttl=None): + return None + + ch = StandardCacheHandler(NoBufferBackend()) # type: ignore[arg-type] + assert ch.get_buffer("k") is None + + +@pytest.mark.unit +class TestGetCachedValueMmapBranch: + @staticmethod + def _handler(sh: MagicMock, ch: MagicMock) -> CacheOperationHandler: + return CacheOperationHandler(sh, MagicMock(), cache_handler=ch) + + def test_eligible_reads_via_mmap_and_confines_the_handle(self) -> None: + sentinel = object() + sh = MagicMock() + sh.supports_mmap_read.return_value = True + sh.deserialize_data.return_value = sentinel + handle = MagicMock() + ch = MagicMock() + ch.get_buffer.return_value = handle + + result = self._handler(sh, ch).get_cached_value("k") + + assert result == (True, sentinel) + ch.get_buffer.assert_called_once_with("k") + ch.get.assert_not_called() # normal read path NOT used on the mmap hit + sh.deserialize_data.assert_called_once_with(handle.view, "k") + handle.close.assert_called_once() # mmap released in finally, never escapes the frame + + def test_not_eligible_uses_normal_read_path(self) -> None: + sh = MagicMock() + sh.supports_mmap_read.return_value = False + ch = MagicMock() + ch.get.return_value = None # miss + + self._handler(sh, ch).get_cached_value("k") + + ch.get_buffer.assert_not_called() + ch.get.assert_called_once() + + def test_get_buffer_none_falls_through_to_normal_read(self) -> None: + sh = MagicMock() + sh.supports_mmap_read.return_value = True + sh.deserialize_data.return_value = "val" + ch = MagicMock() + ch.get_buffer.return_value = None # file ineligible (non-posix / too big / missing) + ch.get.return_value = b"frame" + + result = self._handler(sh, ch).get_cached_value("k") + + ch.get_buffer.assert_called_once() + ch.get.assert_called_once() # fell through + assert result == (True, "val") + + +@pytest.mark.unit +class TestMmapReadEndToEnd: + """Real Arrow + File backend through the full handler stack: the mmap path is actually taken + (not the os.read path) and the DataFrame round-trips intact.""" + + def test_arrow_dataframe_roundtrips_through_real_mmap(self, tmp_path) -> None: + pd = pytest.importorskip("pandas") + pytest.importorskip("pyarrow") + from unittest.mock import patch + + from cachekit.backends.file.backend import FileBackend + from cachekit.backends.file.config import FileBackendConfig + from cachekit.key_generator import CacheKeyGenerator + + sh = CacheSerializationHandler(serializer_name="arrow") + backend = FileBackend(FileBackendConfig(cache_dir=tmp_path / "c", max_size_mb=64, max_value_mb=32)) + ch = StandardCacheHandler(backend) + oh = CacheOperationHandler(sh, CacheKeyGenerator(), cache_handler=ch) + + df = pd.DataFrame({"a": range(2000), "b": [float(i) / 3 for i in range(2000)]}) + ch.set("k", sh.serialize_data(df, cache_key="k"), 300) + + with ( + patch.object(backend, "get", wraps=backend.get) as g, + patch.object(backend, "get_buffer", wraps=backend.get_buffer) as gb, + ): + hit = oh.get_cached_value("k") + + assert hit is not None + found, value = hit + assert found is True + pd.testing.assert_frame_equal(value, df) + gb.assert_called_once() # the real mmap path was taken + g.assert_not_called() # not the os.read fallback From 037a545ef74a3eb9b03384be6c9e718934994a04 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Wed, 17 Jun 2026 23:53:24 +1000 Subject: [PATCH 2/4] test: cover mmap read-path branches; drop dead async mmap branch - Remove the speculative mmap branch from get_cached_value_async: it has no caller (the async decorator path inlines get_async), so it was dead code and the only thing dragging diff coverage below the 80% patch gate. YAGNI. - get_buffer branch tests: corrupt magic, empty payload, truncated file, unexpected OSError -> BackendError, and the rename-under-mmap invariant. - StandardCacheHandler.get_buffer error-path tests (BackendError + generic). - pragma: no cover on the genuinely untestable-on-Linux defensive branches (POSIX guard, rare open errors, idempotent-close excepts, mid-map cleanup). - Test the arrow return_format gate with a throwaway serializer instead of mutating the process-wide cached instance (addresses CodeRabbit nit). --- src/cachekit/backends/file/backend.py | 14 +++++----- src/cachekit/cache_handler.py | 15 +++-------- tests/unit/backends/test_file_backend.py | 34 ++++++++++++++++++++++++ tests/unit/test_mmap_read_path.py | 30 ++++++++++++++------- 4 files changed, 64 insertions(+), 29 deletions(-) diff --git a/src/cachekit/backends/file/backend.py b/src/cachekit/backends/file/backend.py index 24aac1c..071577c 100644 --- a/src/cachekit/backends/file/backend.py +++ b/src/cachekit/backends/file/backend.py @@ -75,12 +75,12 @@ def close(self) -> None: """Release the view then the mapping. Idempotent (safe to call more than once).""" try: self.view.release() # must release exports before mmap.close(), else BufferError - except (ValueError, BufferError): - pass # already released + except (ValueError, BufferError): # pragma: no cover - defensive (already released / lingering sub-export) + pass try: self._mm.close() - except (ValueError, BufferError): - pass # already closed + except (ValueError, BufferError): # pragma: no cover - defensive (already closed) + pass class FileBackend: @@ -239,7 +239,7 @@ def get_buffer(self, key: str) -> _MmapHandle | None: by path and would follow an attacker-swapped symlink, reintroducing the TOCTOU that ``O_NOFOLLOW`` closes. The mapping survives the fd close on POSIX. """ - if os.name != "posix": + if os.name != "posix": # pragma: no cover - Windows-only branch; CI is Linux return None # mapped files can't be renamed/unlinked on Windows; caller uses get() file_path = self._key_to_path(key) @@ -249,7 +249,7 @@ def get_buffer(self, key: str) -> _MmapHandle | None: fd = os.open(file_path, os.O_RDONLY | os.O_NOFOLLOW) except FileNotFoundError: return None - except OSError as exc: + except OSError as exc: # pragma: no cover - rare open errors (ELOOP/EACCES); defensive if exc.errno in (errno.ENOENT, errno.ELOOP): return None # missing, or symlink rejected by O_NOFOLLOW raise BackendError( @@ -299,7 +299,7 @@ def get_buffer(self, key: str) -> _MmapHandle | None: key=key, ) from exc finally: - if mm is not None: + if mm is not None: # pragma: no cover - only on a mid-map exception; ownership normally moved to the handle mm.close() os.close(fd) diff --git a/src/cachekit/cache_handler.py b/src/cachekit/cache_handler.py index 62a50dd..7dc5271 100644 --- a/src/cachekit/cache_handler.py +++ b/src/cachekit/cache_handler.py @@ -926,18 +926,9 @@ async def get_cached_value_async(self, cache_key: str, refresh_ttl: Optional[int if self._cache_handler is None: raise RuntimeError("Cache handler must be set before calling get_cached_value_async") - # mmap fast path (#171): same as the sync path. mmap setup is a fast local syscall and - # the existing async path already deserializes synchronously here, so this adds no new - # event-loop blocking. The handle is confined to this frame; the mmap never reaches L1. - if self.serialization_handler.supports_mmap_read(): - handle = self._cache_handler.get_buffer(cache_key) - if handle is not None: - try: - get_logger().cache_hit(cache_key, "Backend(mmap)") - return (True, self.serialization_handler.deserialize_data(handle.view, cache_key)) - finally: - handle.close() - + # NOTE: no mmap fast path here. The async decorator path inlines get_async (it does not + # route through this method today), so an mmap branch would be dead code. The mmap read + # lives on the sync get_cached_value; add it here only when an async caller routes through. cached_data = await self._cache_handler.get_async(cache_key, refresh_ttl) if cached_data is not None: get_logger().cache_hit(cache_key, "Backend") diff --git a/tests/unit/backends/test_file_backend.py b/tests/unit/backends/test_file_backend.py index 3c2f0cd..7d2c7ef 100644 --- a/tests/unit/backends/test_file_backend.py +++ b/tests/unit/backends/test_file_backend.py @@ -1767,3 +1767,37 @@ def test_open_mmap_unaffected_by_concurrent_set(self, backend: FileBackend) -> N finally: handle.close() assert backend.get("k") == b"NEW-PAYLOAD-IS-LONGER" # a fresh read sees the new value + + def test_get_buffer_corrupt_magic_returns_none_and_unlinks(self, backend: FileBackend) -> None: + backend.set("k", b"payload-data-here", ttl=300) + path = Path(backend._key_to_path("k")) + raw = bytearray(path.read_bytes()) + raw[0:2] = b"XX" # clobber the CK magic + path.write_bytes(bytes(raw)) + assert backend.get_buffer("k") is None + assert not path.exists() # corrupt entry is unlinked + + def test_get_buffer_empty_payload_returns_none(self, backend: FileBackend) -> None: + backend.set("k", b"", ttl=300) # header only, no payload to map + assert backend.get_buffer("k") is None + + def test_get_buffer_truncated_file_returns_none_and_unlinks(self, backend: FileBackend) -> None: + path = Path(backend._key_to_path("trunc")) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(b"CK\x01") # shorter than the 14-byte header + assert backend.get_buffer("trunc") is None + assert not path.exists() + + def test_get_buffer_wraps_unexpected_oserror_as_backend_error( + self, backend: FileBackend, monkeypatch: pytest.MonkeyPatch + ) -> None: + backend.set("k", b"x" * 5000, ttl=300) + import cachekit.backends.file.backend as backend_mod + from cachekit.backends.errors import BackendError + + def boom(*_a, **_k): + raise OSError(errno.EIO, "simulated I/O error") + + monkeypatch.setattr(backend_mod.mmap, "mmap", boom) + with pytest.raises(BackendError): + backend.get_buffer("k") diff --git a/tests/unit/test_mmap_read_path.py b/tests/unit/test_mmap_read_path.py index 3991eca..3b8289b 100644 --- a/tests/unit/test_mmap_read_path.py +++ b/tests/unit/test_mmap_read_path.py @@ -43,16 +43,10 @@ def test_arrow_return_format_not_eligible(self) -> None: from cachekit.serializers.arrow_serializer import ArrowSerializer sh = CacheSerializationHandler(serializer_name="arrow") - serializer = sh._base_serializer - assert isinstance(serializer, ArrowSerializer) # narrows type so .return_format is valid - # get_serializer caches one ArrowSerializer process-wide; restore after mutating so later - # tests don't get a serializer that returns a Table instead of a DataFrame. - original = serializer.return_format - serializer.return_format = "arrow" - try: - assert sh.supports_mmap_read() is False - finally: - serializer.return_format = original + # Swap in a throwaway serializer rather than mutating the process-wide cached instance, so + # this test can never leak an "arrow" return_format into another test (no shared state). + sh._base_serializer = ArrowSerializer(return_format="arrow") + assert sh.supports_mmap_read() is False @pytest.mark.unit @@ -73,6 +67,22 @@ def get(self, key, refresh_ttl=None): ch = StandardCacheHandler(NoBufferBackend()) # type: ignore[arg-type] assert ch.get_buffer("k") is None + def test_returns_none_on_backend_error(self) -> None: + """A backend error during get_buffer must degrade to None so the caller falls back to get().""" + from cachekit.backends.errors import BackendError + + backend = MagicMock() + backend.get_buffer.side_effect = BackendError("boom") + ch = StandardCacheHandler(backend) + assert ch.get_buffer("k") is None + + def test_returns_none_on_unexpected_error(self) -> None: + """A non-BackendError exception must also degrade to None (mirrors get()'s catch-all).""" + backend = MagicMock() + backend.get_buffer.side_effect = RuntimeError("unexpected") + ch = StandardCacheHandler(backend) + assert ch.get_buffer("k") is None + @pytest.mark.unit class TestGetCachedValueMmapBranch: From 0ed9be952c61a25d748e6cf5b577bc7c93300616 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Thu, 18 Jun 2026 09:12:31 +1000 Subject: [PATCH 3/4] ci: gate memory invariants (mmap RSS + large-object allocations) on every run Run the deterministic `performance and slow` tests in CI: the #171 mmap zero-copy read (RSS stays flat) and the #152 large-object allocation caps. They were excluded by the blanket `-m "not slow"` filter despite being reproducible resource invariants, not wall-clock benchmarks. ~5s, run once on 3.12 (version-independent). Flaky timing/throughput benchmarks stay `performance` WITHOUT `slow` and remain out of CI. --- .github/workflows/ci.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2669fb6..3f2bf5b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -119,6 +119,14 @@ jobs: --junitxml=junit.xml \ -o junit_family=legacy + # Deterministic memory invariants (RSS/allocation bounds), not wall-clock benchmarks. These + # are `performance and slow` — the mmap zero-copy read (#171) and the large-object allocation + # caps (#152). Flaky timing/throughput benchmarks are `performance` WITHOUT `slow` and stay out + # of CI. Version-independent, so run once on 3.12 (~5s) rather than across the push matrix. + - name: Run memory-invariant tests + if: matrix.python-version == '3.12' + run: uv run pytest tests/performance/ -m "performance and slow" -q + - name: Upload coverage to Codecov if: ${{ !cancelled() }} uses: codecov/codecov-action@1af58845a975a7985b0beb0cbe6fbbb71a41dbad # v5 From 7619dc9cad7155bf143bfabe5d6d6ad1c3b87624 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Thu, 18 Jun 2026 09:18:14 +1000 Subject: [PATCH 4/4] ci: pass REDIS_URL to the memory-invariant step The autouse redis-isolation fixture (tests/conftest.py) spawns a local redis-server binary unless REDIS_URL is set; the CI runner has no such binary (it uses dockerized Redis), so the step ERRORed at fixture setup. Mirror the unit/critical steps and point it at the external Redis. --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3f2bf5b..f5bb970 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,6 +125,8 @@ jobs: # of CI. Version-independent, so run once on 3.12 (~5s) rather than across the push matrix. - name: Run memory-invariant tests if: matrix.python-version == '3.12' + env: + REDIS_URL: redis://localhost:6379 # autouse redis-isolation fixture uses external Redis when set (else spawns a binary the runner lacks) run: uv run pytest tests/performance/ -m "performance and slow" -q - name: Upload coverage to Codecov