Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ jobs:
--junitxml=junit.xml \
-o junit_family=legacy

# Deterministic memory invariants (RSS/allocation bounds), not wall-clock benchmarks. These
# are `performance and slow` — the mmap zero-copy read (#171) and the large-object allocation
# caps (#152). Flaky timing/throughput benchmarks are `performance` WITHOUT `slow` and stay out
# of CI. Version-independent, so run once on 3.12 (~5s) rather than across the push matrix.
- name: Run memory-invariant tests
if: matrix.python-version == '3.12'
env:
REDIS_URL: redis://localhost:6379 # autouse redis-isolation fixture uses external Redis when set (else spawns a binary the runner lacks)
run: uv run pytest tests/performance/ -m "performance and slow" -q

- name: Upload coverage to Codecov
if: ${{ !cancelled() }}
uses: codecov/codecov-action@1af58845a975a7985b0beb0cbe6fbbb71a41dbad # v5
Expand Down
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@
"filename": "src/cachekit/cache_handler.py",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 257
"line_number": 266
}
],
"src/cachekit/config/decorator.py": [
Expand Down Expand Up @@ -425,5 +425,5 @@
}
]
},
"generated_at": "2026-06-11T01:10:45Z"
"generated_at": "2026-06-17T13:30:40Z"
}
35 changes: 35 additions & 0 deletions src/cachekit/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,41 @@ async def refresh_ttl(self, key: str, ttl: int) -> bool:
...


@runtime_checkable
class BufferHandle(Protocol):
"""A borrowed, zero-copy view of a cached value plus the resource backing it.

Returned by ``BufferReadableBackend.get_buffer``. ``view`` aliases backend-owned memory (e.g.
mmap'd file pages), not a heap copy — so the consumer must finish reading and ``close()``
before the view is touched again. The view DANGLES after close (touching it can segfault), so
it must never be stored (e.g. in L1) nor returned past the read call frame (#171).
"""

view: memoryview
"""Zero-copy view of the payload (valid only until close())."""

def close(self) -> None:
"""Release the view and its backing resource. Idempotent."""
...


@runtime_checkable
class BufferReadableBackend(Protocol):
"""Optional protocol for backends that can return a zero-copy buffer instead of materializing
the whole value on the heap.

Lets large plaintext values (e.g. uncompressed Arrow IPC) be read without copying the payload.
Only the File backend implements this today (mmap, POSIX). Backends that don't implement it
are simply read via ``get`` as usual.
"""

def get_buffer(self, key: str) -> Optional[BufferHandle]:
"""Return a borrowed zero-copy handle for ``key``, or None when the value is not mappable
(missing/expired/too large/non-POSIX) — the caller then falls back to ``get``. The caller
MUST ``close()`` the handle when done reading."""
...


@runtime_checkable
class LockableBackend(Protocol):
"""Optional protocol for backends supporting distributed locking.
Expand Down
114 changes: 114 additions & 0 deletions src/cachekit/backends/file/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import errno
import hashlib
import mmap
import os
import platform
import struct
Expand Down Expand Up @@ -47,6 +48,40 @@
# TTL bounds (security: prevent integer overflow)
MAX_TTL_SECONDS: int = 10 * 365 * 24 * 60 * 60 # 10 years max

# Read-side mmap ceiling (#171, fork 4): a fixed internal cap independent of max_value_mb so a
# misconfigured huge max_value_mb (or an out-of-band file dropped in cache_dir) can't map an
# unbounded region. Above this, get_buffer() returns None and the caller falls back to os.read.
MMAP_MAX_BYTES: int = 512 * 1024 * 1024 # 512 MB


class _MmapHandle:
"""Owns a read-only mmap of a cache file plus a memoryview of its payload (past the 14-byte
header). Zero-copy: the view aliases mapped pages, never a heap copy.

The CALLER must ``close()`` once the consumer is done (after Arrow deserialize has copied the
data out via to_pandas). The view DANGLES after close — touching it segfaults — so the handle
must never escape the deserialize call frame and must never be stored in L1 (#171 blocker C).
"""

__slots__ = ("_mm", "view")

def __init__(self, mm: mmap.mmap) -> None:
self._mm = mm
# Slice past the 14-byte header; the slice exports its buffer directly from `mm`, so it is
# the only export to release before mm.close().
self.view: memoryview = memoryview(mm)[HEADER_SIZE:]

def close(self) -> None:
"""Release the view then the mapping. Idempotent (safe to call more than once)."""
try:
self.view.release() # must release exports before mmap.close(), else BufferError
except (ValueError, BufferError): # pragma: no cover - defensive (already released / lingering sub-export)
pass
try:
self._mm.close()
except (ValueError, BufferError): # pragma: no cover - defensive (already closed)
pass


class FileBackend:
"""File-based backend for local disk caching.
Expand Down Expand Up @@ -189,6 +224,85 @@ def get(self, key: str) -> bytes | None:
key=key,
) from exc

def get_buffer(self, key: str) -> _MmapHandle | None:
"""Memory-map a cache value for a zero-copy read of its payload (POSIX only; #171).

Returns an `_MmapHandle` owning the mmap + a memoryview of the payload (past the 14-byte
header), or `None` when mmap does not apply — the caller then falls back to `get()`:
- non-POSIX platform (Windows pins mapped files against rename/unlink);
- missing / expired / corrupt entry (corrupt + expired are unlinked, mirroring `get`);
- empty payload (nothing to map);
- file larger than ``MMAP_MAX_BYTES``.

Security: the fd is opened with ``O_NOFOLLOW`` and the header is validated from the fd
BEFORE mapping. We never use ``pa.memory_map(path)`` / a path-based mmap — that re-opens
by path and would follow an attacker-swapped symlink, reintroducing the TOCTOU that
``O_NOFOLLOW`` closes. The mapping survives the fd close on POSIX.
"""
if os.name != "posix": # pragma: no cover - Windows-only branch; CI is Linux
return None # mapped files can't be renamed/unlinked on Windows; caller uses get()

file_path = self._key_to_path(key)

with self._lock:
try:
fd = os.open(file_path, os.O_RDONLY | os.O_NOFOLLOW)
except FileNotFoundError:
return None
except OSError as exc: # pragma: no cover - rare open errors (ELOOP/EACCES); defensive
if exc.errno in (errno.ENOENT, errno.ELOOP):
return None # missing, or symlink rejected by O_NOFOLLOW
raise BackendError(
f"Failed to open cache file for mmap: {exc}",
error_type=self._classify_os_error(exc, is_directory=False),
original_exception=exc,
operation="get_buffer",
key=key,
) from exc

mm: mmap.mmap | None = None
try:
self._acquire_file_lock(fd, exclusive=False)
try:
st_size = os.fstat(fd).st_size

# Validate-then-map: never map a file we're about to delete.
if st_size < HEADER_SIZE:
self._safe_unlink(file_path)
return None
header = os.read(fd, HEADER_SIZE)
if header[0:2] != MAGIC or header[2] != FORMAT_VERSION:
self._safe_unlink(file_path)
return None
expiry_timestamp = struct.unpack(">Q", header[6:14])[0]
if expiry_timestamp > 0 and time.time() > expiry_timestamp:
self._safe_unlink(file_path)
return None

# Empty payload (header only): nothing to map (mmap rejects length 0 anyway).
# Too large: fall back to os.read so we never map an unbounded region.
if st_size <= HEADER_SIZE or st_size > MMAP_MAX_BYTES:
return None

mm = mmap.mmap(fd, st_size, access=mmap.ACCESS_READ)
handle = _MmapHandle(mm)
mm = None # ownership transferred to the handle; don't close it in finally
return handle
finally:
self._release_file_lock(fd)
except OSError as exc:
raise BackendError(
f"Failed to mmap cache file: {exc}",
error_type=self._classify_os_error(exc, is_directory=False),
original_exception=exc,
operation="get_buffer",
key=key,
) from exc
finally:
if mm is not None: # pragma: no cover - only on a mid-map exception; ownership normally moved to the handle
mm.close()
os.close(fd)

def set(self, key: str, value: bytes, ttl: int | None = None) -> None:
"""Store value in file storage with atomic write.

Expand Down
68 changes: 66 additions & 2 deletions src/cachekit/cache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Optional, Protocol, TypeGuard, Union, runtime_checkable

from cachekit.backends.base import BackendError, BaseBackend, TTLInspectableBackend
from cachekit.backends.base import BackendError, BaseBackend, BufferHandle, BufferReadableBackend, TTLInspectableBackend
from cachekit.backends.provider import (
BackendProviderInterface,
DefaultBackendProvider,
Expand Down Expand Up @@ -85,6 +85,15 @@ def supports_ttl_inspection(backend: BaseBackend) -> TypeGuard[TTLInspectableBac
return hasattr(backend, "get_ttl") and hasattr(backend, "refresh_ttl")


def supports_buffer_read(backend: BaseBackend) -> TypeGuard[BufferReadableBackend]:
"""Type guard: backend can return a zero-copy buffer via get_buffer (#171, File/POSIX only).

Returns:
True if backend implements BufferReadableBackend (used for the mmap Arrow read fast path).
"""
return hasattr(backend, "get_buffer")


# Import caching for serializer modules
#
# PERFORMANCE OPTIMIZATION: Dynamic imports are expensive (~100μs per import)
Expand Down Expand Up @@ -640,7 +649,25 @@ def serialize_data(
get_logger().error(f"Serialization failed with {self.serializer_name}: {e}")
raise SerializationError(f"Failed to serialize data with {self.serializer_name}: {e}") from e

def deserialize_data(self, data: str | bytes, cache_key: str = "") -> Any:
def supports_mmap_read(self) -> bool:
"""True iff reads can use the zero-copy mmap fast path (#171).

Eligible only for PLAINTEXT Arrow that returns pandas:
- encrypted values can never mmap (AES-GCM decrypt owns its buffer);
- non-Arrow serializers gain nothing (they copy at the Rust/C boundary, rebuild objects);
- the "arrow" return_format yields a table that ALIASES the mapped pages, so closing the
handle would be a use-after-free — pandas (which copies out via to_pandas) only.

The backend must also support buffer reads (File/POSIX); that is checked separately, so a
True here on a non-File backend simply means get_buffer returns None and we fall back.
"""
return (
not self.encryption
and self._serializer_string_name == "arrow"
and getattr(self._base_serializer, "return_format", None) == "pandas"
)

def deserialize_data(self, data: str | bytes | memoryview, cache_key: str = "") -> Any:
"""Deserialize data from cache storage with cache_key verification.

Args:
Expand Down Expand Up @@ -845,6 +872,19 @@ def get_cached_value(self, cache_key: str, refresh_ttl: Optional[int] = None) ->
if self._cache_handler is None:
raise RuntimeError("Cache handler must be set before calling get_cached_value")

# mmap fast path (#171): plaintext Arrow -> pandas on a buffer-readable backend (File,
# POSIX) reads zero-copy. The handle is confined to this frame and closed in `finally`,
# so the mmap never becomes the returned value and never reaches L1 (blocker C). A None
# from get_buffer (ineligible file, or a non-buffer backend) falls through to bytes.
if self.serialization_handler.supports_mmap_read():
handle = self._cache_handler.get_buffer(cache_key)
if handle is not None:
try:
get_logger().cache_hit(cache_key, "Backend(mmap)")
return (True, self.serialization_handler.deserialize_data(handle.view, cache_key))
finally:
handle.close()

cached_data = self._cache_handler.get(cache_key, refresh_ttl)
if cached_data is not None:
get_logger().cache_hit(cache_key, "Backend")
Expand Down Expand Up @@ -886,6 +926,9 @@ async def get_cached_value_async(self, cache_key: str, refresh_ttl: Optional[int
if self._cache_handler is None:
raise RuntimeError("Cache handler must be set before calling get_cached_value_async")

# NOTE: no mmap fast path here. The async decorator path inlines get_async (it does not
# route through this method today), so an mmap branch would be dead code. The mmap read
# lives on the sync get_cached_value; add it here only when an async caller routes through.
cached_data = await self._cache_handler.get_async(cache_key, refresh_ttl)
if cached_data is not None:
get_logger().cache_hit(cache_key, "Backend")
Expand Down Expand Up @@ -1100,6 +1143,10 @@ def get(self, key: str, refresh_ttl: Optional[int] = None) -> Optional[bytes]:
"""Get value from cache with optional TTL refresh."""
...

def get_buffer(self, key: str) -> Optional[BufferHandle]:
"""Return a zero-copy buffer handle if the backend supports it (#171), else None."""
...

def set(self, key: str, value: Union[str, bytes], ttl: Optional[int] = None, **metadata) -> bool:
"""Set value in cache with TTL and optional metadata."""
...
Expand Down Expand Up @@ -1266,6 +1313,23 @@ def get(self, key: str, refresh_ttl: Optional[int] = None) -> Optional[bytes]:
get_logger().error(f"Unexpected error getting key {key}: {e}")
return None

def get_buffer(self, key: str) -> Optional[BufferHandle]:
"""Return a zero-copy buffer handle for key if the backend supports it (#171), else None.

Mirrors get()'s backpressure/timeout wrapping. Returns None when the backend can't map the
value (or on any backend error) so the caller transparently falls back to get().
"""
if not supports_buffer_read(self.backend):
return None
try:
return self._with_backpressure_and_timeout(self.backend.get_buffer, key)
except BackendError as e:
get_logger().error(f"Backend error mmapping key {key}: {e}")
return None
except Exception as e:
get_logger().error(f"Unexpected error mmapping key {key}: {e}")
return None

def set(self, key: str, value: Union[str, bytes], ttl: Optional[int] = None, **metadata) -> bool:
"""Set value in cache using backend.

Expand Down
5 changes: 3 additions & 2 deletions src/cachekit/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ class CachekitConfig(BaseSettings):
description=(
"Arrow IPC compression codec for DataFrame caching (ArrowSerializer, compression='auto'). "
"'zstd'/'lz4' shrink the stored payload but must be decompressed into the heap on read. "
"'none' stores uncompressed Arrow IPC, which enables zero-copy memory-mapped reads "
"(lowest read memory) at the cost of a larger payload. Env: CACHEKIT_ARROW_COMPRESSION."
"'none' stores uncompressed Arrow IPC, which lets the File backend serve plaintext "
"DataFrame reads via a zero-copy mmap (low steady-state read RSS; peak transiently "
"higher) at the cost of a larger payload. Env: CACHEKIT_ARROW_COMPRESSION."
),
)
retry_on_timeout: bool = Field(
Expand Down
16 changes: 16 additions & 0 deletions src/cachekit/l1_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,23 @@ def put(
redis_ttl: TTL in seconds from Redis (used to calculate expiry)
expires_at: Absolute expiry timestamp (overrides redis_ttl)
namespace: Optional namespace for invalidation support

Raises:
TypeError: if `value` is not exactly `bytes`. L1 stores raw bytes only; a memoryview
(e.g. an mmap-backed view from the File backend) or a mutable bytearray must never
be stored — the former would pin a mapped file's inode for the whole TTL, the
latter could mutate underneath the cache (#171 blocker C). Loud-fail a regression
rather than silently alias.
"""
# Runtime guard: the annotation says bytes, but callers reach here across dynamic
# boundaries (backend.get returns, decorator paths) where the type isn't enforced.
if not isinstance(value, bytes): # pyright: ignore[reportUnnecessaryIsInstance]
raise TypeError(
f"L1Cache.put requires bytes, got {type(value).__name__}. "
"Storing a memoryview/bytearray in L1 is forbidden: an mmap-backed view would pin "
"the mapped file for the entry's TTL. Materialize to bytes before caching."
)

# Calculate expiry time
current_time = time.time()
if expires_at is not None:
Expand Down
9 changes: 6 additions & 3 deletions src/cachekit/serializers/arrow_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ def __init__(self, return_format: str = "pandas", enable_integrity_checking: boo
compression: Arrow IPC compression codec.
- "auto" (default): use the CACHEKIT_ARROW_COMPRESSION setting (itself "zstd" by default)
- "zstd" / "lz4": compress the payload (smaller wire/L1; must be decompressed on read)
- None or "none": store uncompressed Arrow IPC, enabling zero-copy memory-mapped reads
(lowest read memory) at the cost of a larger payload
- None or "none": store uncompressed Arrow IPC. Lets the File backend serve plaintext
DataFrame reads (returned as pandas) via a zero-copy mmap — low steady-state read
RSS (~0.32x), though peak is transiently higher from checksum verification + pandas
materialization — at the cost of a larger stored payload. No effect on wire backends.

Raises:
ValueError: If return_format or compression is not a valid option
Expand Down Expand Up @@ -226,7 +228,8 @@ def serialize(self, obj: Any) -> tuple[bytes, SerializationMetadata]: # type: i
# writing in bounded batches keeps the compressor's working set bounded (one big
# batch makes the codec allocate a full-size working buffer — measured ~3.6x the
# payload). Size each batch to ~8 MiB regardless of schema width. compression=None
# writes uncompressed IPC, which a reader can memory-map zero-copy.
# writes uncompressed IPC, which the File backend reads zero-copy via mmap (#171,
# plaintext, pandas return only).
max_chunksize = _bounded_chunksize(table)
sink = pa.BufferOutputStream()
write_options = pa.ipc.IpcWriteOptions(compression=self.compression) if self.compression else None
Expand Down
Loading
Loading