Skip to content

Commit 857cbca

Browse files
authored
Support pickling CachingReadableStore (#36)
1 parent d4b3e4d commit 857cbca

3 files changed

Lines changed: 312 additions & 0 deletions

File tree

docs/caching-architecture.md

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,117 @@ For workloads requiring cross-worker cache sharing, consider:
235235
- Shared filesystem caching
236236
- Restructuring workloads to minimize cross-worker file access
237237

238+
### Pickling and Serialization
239+
240+
`CachingReadableStore` supports Python's pickle protocol for use with multiprocessing and distributed frameworks. When a `CachingReadableStore` is pickled and unpickled (e.g., sent to a worker process), it is **recreated with an empty cache**.
241+
242+
```python
243+
import pickle
244+
from obspec_utils.cache import CachingReadableStore
245+
246+
# Main process: create and populate cache
247+
cached_store = CachingReadableStore(store, max_size=256 * 1024 * 1024)
248+
cached_store.get("file1.nc") # Cached
249+
cached_store.get("file2.nc") # Cached
250+
print(cached_store.cache_size) # Non-zero
251+
252+
# Simulate sending to worker (pickle roundtrip)
253+
restored = pickle.loads(pickle.dumps(cached_store))
254+
255+
# Worker receives store with empty cache
256+
print(restored.cache_size) # 0
257+
print(restored._max_size) # 256 * 1024 * 1024 (preserved)
258+
```
259+
260+
**Design rationale:**
261+
262+
1. **Cache contents are not serialized**: Serializing the full cache would defeat the purpose of distributed processing—workers would receive potentially huge payloads, and the data may not even be relevant to their partition.
263+
264+
2. **Fresh cache per worker**: Each worker builds its own cache based on its workload. For file-partitioned workloads (common in data processing), this is optimal—each worker caches only the files it processes.
265+
266+
3. **Configuration is preserved**: The `max_size` and underlying store are preserved, so workers use the same caching policy as the main process.
267+
268+
**Requirements for pickling:**
269+
270+
The underlying store (`_store`) must also be picklable. For cloud stores, this typically means using stores that can be reconstructed from configuration:
271+
272+
```python
273+
# Works: store can be pickled (configuration-based)
274+
from obstore.store import S3Store
275+
s3_store = S3Store(bucket="my-bucket", region="us-east-1")
276+
cached = CachingReadableStore(s3_store)
277+
pickle.dumps(cached) # OK
278+
279+
# May not work: some Rust-backed stores aren't picklable
280+
from obstore.store import MemoryStore
281+
mem_store = MemoryStore()
282+
cached = CachingReadableStore(mem_store)
283+
pickle.dumps(cached) # TypeError: cannot pickle 'MemoryStore' object
284+
```
285+
286+
### Distributed Usage Patterns
287+
288+
#### Pattern 1: File-Partitioned Workloads (Recommended)
289+
290+
When each worker processes a distinct set of files, per-worker caching works well:
291+
292+
```python
293+
from concurrent.futures import ProcessPoolExecutor
294+
from obspec_utils.cache import CachingReadableStore
295+
296+
def process_files(cached_store, file_paths):
297+
"""Each worker gets its own cache, processes its own files."""
298+
results = []
299+
for path in file_paths:
300+
# First access: fetch from network, cache locally
301+
data = cached_store.get(path)
302+
# Subsequent accesses to same file: served from cache
303+
result = analyze(data)
304+
results.append(result)
305+
return results
306+
307+
# Create cached store in main process
308+
store = S3Store(bucket="my-bucket")
309+
cached_store = CachingReadableStore(store, max_size=512 * 1024 * 1024)
310+
311+
# Partition files across workers
312+
all_files = ["file1.nc", "file2.nc", "file3.nc", "file4.nc"]
313+
partitions = [all_files[:2], all_files[2:]]
314+
315+
with ProcessPoolExecutor(max_workers=2) as executor:
316+
futures = [
317+
executor.submit(process_files, cached_store, partition)
318+
for partition in partitions
319+
]
320+
results = [f.result() for f in futures]
321+
```
322+
323+
#### Pattern 2: Dask Distributed
324+
325+
With Dask, the cached store is serialized to each worker:
326+
327+
```python
328+
import dask
329+
from dask.distributed import Client
330+
from obspec_utils.cache import CachingReadableStore
331+
332+
client = Client()
333+
334+
store = S3Store(bucket="my-bucket")
335+
cached_store = CachingReadableStore(store)
336+
337+
@dask.delayed
338+
def process_file(cached_store, path):
339+
# Worker receives cached_store with empty cache
340+
# Cache builds up as this worker processes files
341+
data = cached_store.get(path)
342+
return analyze(data)
343+
344+
tasks = [process_file(cached_store, f) for f in file_list]
345+
results = dask.compute(*tasks)
346+
```
347+
348+
238349
## Decision Guide
239350

240351
### Which reader should I use?

src/obspec_utils/cache.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ def __getattr__(self, name: str) -> Any:
113113
"""
114114
return getattr(self._store, name)
115115

116+
def __reduce__(self):
117+
"""Support pickling for multiprocessing and distributed frameworks.
118+
119+
Returns a fresh instance with an empty cache. This is intentional:
120+
serializing the full cache contents would be inefficient for distributed
121+
workloads where each worker typically processes different files.
122+
123+
The underlying store and max_size configuration are preserved.
124+
"""
125+
return (
126+
self.__class__,
127+
(self._store, self._max_size),
128+
)
129+
116130
def _add_to_cache(self, path: str, data: bytes) -> None:
117131
"""Add data to cache, evicting LRU entries if needed.
118132

tests/test_cache.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for CachingReadableStore."""
22

3+
import pickle
34
import threading
45
from concurrent.futures import ThreadPoolExecutor
56

@@ -389,3 +390,189 @@ def test_forwards_unknown_attributes(self):
389390
# This tests that __getattr__ forwards correctly
390391
assert hasattr(cached, "put") # MemoryStore has put
391392
assert hasattr(cached, "delete") # MemoryStore has delete
393+
394+
395+
class PicklableStore:
396+
"""A simple picklable store for testing pickle support.
397+
398+
MemoryStore from obstore is Rust-backed and not picklable.
399+
This pure-Python store allows testing CachingReadableStore's pickle support.
400+
"""
401+
402+
def __init__(self, data: dict[str, bytes] | None = None):
403+
self._data = data or {}
404+
405+
def put(self, path: str, data: bytes) -> None:
406+
self._data[path] = data
407+
408+
def get(self, path: str, *, options=None):
409+
return _PicklableGetResult(self._data[path])
410+
411+
async def get_async(self, path: str, *, options=None):
412+
return _PicklableGetResultAsync(self._data[path])
413+
414+
def get_range(
415+
self,
416+
path: str,
417+
*,
418+
start: int,
419+
end: int | None = None,
420+
length: int | None = None,
421+
):
422+
data = self._data[path]
423+
if end is not None:
424+
return data[start:end]
425+
elif length is not None:
426+
return data[start : start + length]
427+
return data[start:]
428+
429+
async def get_range_async(
430+
self,
431+
path: str,
432+
*,
433+
start: int,
434+
end: int | None = None,
435+
length: int | None = None,
436+
):
437+
return self.get_range(path, start=start, end=end, length=length)
438+
439+
def get_ranges(self, path: str, *, starts, ends=None, lengths=None):
440+
if ends is not None:
441+
return [self._data[path][s:e] for s, e in zip(starts, ends)]
442+
elif lengths is not None:
443+
return [
444+
self._data[path][start : start + length]
445+
for start, length in zip(starts, lengths)
446+
]
447+
raise ValueError("Must provide ends or lengths")
448+
449+
async def get_ranges_async(self, path: str, *, starts, ends=None, lengths=None):
450+
return self.get_ranges(path, starts=starts, ends=ends, lengths=lengths)
451+
452+
453+
class _PicklableGetResult:
454+
"""Mock GetResult for PicklableStore."""
455+
456+
def __init__(self, data: bytes):
457+
self._data = data
458+
459+
def buffer(self):
460+
return self._data
461+
462+
463+
class _PicklableGetResultAsync:
464+
"""Mock async GetResult for PicklableStore."""
465+
466+
def __init__(self, data: bytes):
467+
self._data = data
468+
469+
async def buffer_async(self):
470+
return self._data
471+
472+
473+
class TestPickling:
474+
"""Tests for pickling support (needed for multiprocessing/distributed)."""
475+
476+
def test_pickle_roundtrip(self):
477+
"""CachingReadableStore can be pickled and unpickled."""
478+
source = PicklableStore()
479+
source.put("file.txt", b"hello world")
480+
481+
cached = CachingReadableStore(source, max_size=128 * 1024 * 1024)
482+
483+
# Pickle and unpickle
484+
pickled = pickle.dumps(cached)
485+
restored = pickle.loads(pickled)
486+
487+
assert isinstance(restored, CachingReadableStore)
488+
489+
def test_pickle_preserves_store_and_max_size(self):
490+
"""Unpickled store preserves underlying store and max_size."""
491+
source = PicklableStore()
492+
source.put("file.txt", b"hello world")
493+
494+
custom_max_size = 64 * 1024 * 1024
495+
cached = CachingReadableStore(source, max_size=custom_max_size)
496+
497+
restored = pickle.loads(pickle.dumps(cached))
498+
499+
# max_size should be preserved
500+
assert restored._max_size == custom_max_size
501+
502+
# underlying store should work (can fetch data)
503+
result = restored.get("file.txt")
504+
assert bytes(result.buffer()) == b"hello world"
505+
506+
def test_pickle_creates_empty_cache(self):
507+
"""Unpickled store has a fresh empty cache."""
508+
source = PicklableStore()
509+
source.put("file.txt", b"hello world")
510+
source.put("file2.txt", b"more data")
511+
512+
cached = CachingReadableStore(source)
513+
514+
# Populate the cache
515+
cached.get("file.txt")
516+
cached.get("file2.txt")
517+
assert cached.cache_size > 0
518+
assert len(cached.cached_paths) == 2
519+
520+
# Pickle and unpickle
521+
restored = pickle.loads(pickle.dumps(cached))
522+
523+
# Restored cache should be empty
524+
assert restored.cache_size == 0
525+
assert len(restored.cached_paths) == 0
526+
527+
def test_pickle_restored_store_is_functional(self):
528+
"""Restored store can cache new data normally."""
529+
source = PicklableStore()
530+
source.put("file.txt", b"hello world")
531+
532+
cached = CachingReadableStore(source, max_size=100)
533+
cached.get("file.txt")
534+
535+
restored = pickle.loads(pickle.dumps(cached))
536+
537+
# Restored store should be able to fetch and cache
538+
result = restored.get("file.txt")
539+
assert bytes(result.buffer()) == b"hello world"
540+
assert "file.txt" in restored.cached_paths
541+
assert restored.cache_size == len(b"hello world")
542+
543+
def test_pickle_restored_store_lru_works(self):
544+
"""Restored store has working LRU eviction."""
545+
source = PicklableStore()
546+
source.put("file1.txt", b"a" * 100)
547+
source.put("file2.txt", b"b" * 100)
548+
source.put("file3.txt", b"c" * 100)
549+
550+
cached = CachingReadableStore(source, max_size=200)
551+
552+
restored = pickle.loads(pickle.dumps(cached))
553+
554+
# Cache two files
555+
restored.get("file1.txt")
556+
restored.get("file2.txt")
557+
assert restored.cached_paths == ["file1.txt", "file2.txt"]
558+
559+
# Third file should evict first
560+
restored.get("file3.txt")
561+
assert restored.cached_paths == ["file2.txt", "file3.txt"]
562+
563+
def test_pickle_multiple_protocols(self):
564+
"""Pickling works with different pickle protocols."""
565+
source = PicklableStore()
566+
source.put("file.txt", b"hello world")
567+
568+
cached = CachingReadableStore(source)
569+
cached.get("file.txt")
570+
571+
# Test all available protocols
572+
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
573+
pickled = pickle.dumps(cached, protocol=protocol)
574+
restored = pickle.loads(pickled)
575+
576+
assert restored.cache_size == 0 # Fresh cache
577+
result = restored.get("file.txt")
578+
assert bytes(result.buffer()) == b"hello world"

0 commit comments

Comments
 (0)