Skip to content

Commit d4b3e4d

Browse files
authored
Increase default size of ParallelStoreReader cache (#35)
1 parent 3874253 commit d4b3e4d

2 files changed

Lines changed: 216 additions & 4 deletions

File tree

src/obspec_utils/obspec.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ def __init__(
550550
self,
551551
store: ReadableStore,
552552
path: str,
553-
chunk_size: int = 256 * 1024,
553+
chunk_size: int = 1024 * 1024,
554554
max_cached_chunks: int = 64,
555555
) -> None:
556556
"""
@@ -563,10 +563,12 @@ def __init__(
563563
path
564564
The path to the file within the store.
565565
chunk_size
566-
Size of each chunk in bytes. Smaller chunks mean more granular caching
567-
but potentially more requests.
566+
Size of each chunk in bytes. Default is 1 MB, tuned for cloud object
567+
stores where HTTP request overhead is significant. Smaller chunks mean
568+
more granular caching but more requests.
568569
max_cached_chunks
569-
Maximum number of chunks to keep in the LRU cache.
570+
Maximum number of chunks to keep in the LRU cache. Default is 64,
571+
giving a 64 MB cache with the default chunk size.
570572
"""
571573
self._store = store
572574
self._path = path

tests/test_parallel_reader.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from obstore.store import MemoryStore
44

55
from obspec_utils.obspec import ParallelStoreReader
6+
from obspec_utils.tracing import RequestTrace, TracingReadableStore
67

78

89
def test_parallel_reader_cross_chunk_read():
@@ -54,3 +55,212 @@ def test_parallel_reader_read_spanning_more_chunks_than_cache():
5455
)
5556

5657
assert reader.read(20) == b"0123456789ABCDEFGHIJ"
58+
59+
60+
def test_parallel_reader_lru_eviction_order():
61+
"""LRU eviction should evict least recently used, not oldest inserted."""
62+
memstore = MemoryStore()
63+
# 16 bytes = 4 chunks of 4 bytes each
64+
memstore.put("test.txt", b"0123456789ABCDEF")
65+
66+
trace = RequestTrace()
67+
traced = TracingReadableStore(memstore, trace)
68+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=4, max_cached_chunks=2)
69+
70+
# Read chunk 0
71+
reader.seek(0)
72+
reader.read(4) # fetches chunk 0
73+
74+
# Read chunk 1
75+
reader.seek(4)
76+
reader.read(4) # fetches chunk 1, cache = [0, 1]
77+
78+
# Re-read chunk 0 (makes it most recently used)
79+
reader.seek(0)
80+
reader.read(4) # cache hit, cache = [1, 0]
81+
82+
trace.clear()
83+
84+
# Read chunk 2 - should evict chunk 1 (LRU), not chunk 0
85+
reader.seek(8)
86+
reader.read(4) # fetches chunk 2, cache = [0, 2]
87+
88+
# Chunk 0 should still be cached (no request)
89+
trace.clear()
90+
reader.seek(0)
91+
reader.read(4)
92+
assert trace.total_requests == 0, "Chunk 0 should still be in cache"
93+
94+
# Chunk 1 should have been evicted (needs request)
95+
trace.clear()
96+
reader.seek(4)
97+
reader.read(4)
98+
assert trace.total_requests == 1, "Chunk 1 should have been evicted"
99+
100+
101+
def test_parallel_reader_cache_hit_no_requests():
102+
"""Re-reading cached data should not make new store requests."""
103+
memstore = MemoryStore()
104+
memstore.put("test.txt", b"0123456789ABCDEF")
105+
106+
trace = RequestTrace()
107+
traced = TracingReadableStore(memstore, trace)
108+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=4, max_cached_chunks=4)
109+
110+
# Initial read
111+
reader.read(8) # fetches chunks 0 and 1
112+
113+
# Re-read same data
114+
trace.clear()
115+
reader.seek(0)
116+
reader.read(8)
117+
118+
assert trace.total_requests == 0, f"Expected 0 requests, got {trace.total_requests}"
119+
120+
121+
def test_parallel_reader_partial_cache_hit():
122+
"""Read spanning cached and uncached chunks should only fetch uncached."""
123+
memstore = MemoryStore()
124+
memstore.put("test.txt", b"0123456789ABCDEF")
125+
126+
trace = RequestTrace()
127+
traced = TracingReadableStore(memstore, trace)
128+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=4, max_cached_chunks=4)
129+
130+
# Read chunk 0 only
131+
reader.read(4)
132+
trace.clear()
133+
134+
# Read chunks 0 and 1 - should only fetch chunk 1
135+
reader.seek(0)
136+
reader.read(8)
137+
138+
# Should have 1 get_ranges request for chunk 1 only
139+
assert trace.total_requests == 1
140+
assert trace.requests[0].start == 4 # chunk 1 starts at byte 4
141+
assert trace.requests[0].length == 4
142+
143+
144+
def test_parallel_reader_read_within_single_chunk():
145+
"""Multiple reads within the same chunk should reuse cache."""
146+
memstore = MemoryStore()
147+
memstore.put("test.txt", b"0123456789ABCDEF")
148+
149+
trace = RequestTrace()
150+
traced = TracingReadableStore(memstore, trace)
151+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=8, max_cached_chunks=2)
152+
153+
# First read fetches chunk 0
154+
reader.read(2) # "01"
155+
assert trace.total_requests == 2 # get (size) + get_ranges (chunk)
156+
trace.clear()
157+
158+
# Subsequent reads within same chunk
159+
reader.read(2) # "23"
160+
assert trace.total_requests == 0
161+
162+
reader.seek(6)
163+
reader.read(2) # "67"
164+
assert trace.total_requests == 0
165+
166+
167+
def test_parallel_reader_read_at_chunk_boundary():
168+
"""Read starting exactly at chunk boundary."""
169+
memstore = MemoryStore()
170+
memstore.put("test.txt", b"0123456789ABCDEF")
171+
172+
reader = ParallelStoreReader(
173+
memstore, "test.txt", chunk_size=4, max_cached_chunks=4
174+
)
175+
176+
# Read exactly at boundaries
177+
reader.seek(4)
178+
assert reader.read(4) == b"4567"
179+
180+
reader.seek(8)
181+
assert reader.read(4) == b"89AB"
182+
183+
reader.seek(12)
184+
assert reader.read(4) == b"CDEF"
185+
186+
187+
def test_parallel_reader_last_chunk_smaller():
188+
"""Last chunk smaller than chunk_size is handled correctly."""
189+
memstore = MemoryStore()
190+
# 10 bytes with chunk_size=4: chunks are [0-3], [4-7], [8-9]
191+
memstore.put("test.txt", b"0123456789")
192+
193+
reader = ParallelStoreReader(
194+
memstore, "test.txt", chunk_size=4, max_cached_chunks=4
195+
)
196+
197+
# Read the partial last chunk
198+
reader.seek(8)
199+
assert reader.read(4) == b"89" # only 2 bytes available
200+
201+
# Read spanning into partial chunk
202+
reader.seek(6)
203+
assert reader.read(10) == b"6789" # 4 bytes available
204+
205+
206+
def test_parallel_reader_read_zero_no_cache_effect():
207+
"""read(0) should not fetch or modify cache."""
208+
memstore = MemoryStore()
209+
memstore.put("test.txt", b"0123456789ABCDEF")
210+
211+
trace = RequestTrace()
212+
traced = TracingReadableStore(memstore, trace)
213+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=4, max_cached_chunks=2)
214+
215+
# Prepopulate cache with chunk 0
216+
reader.read(4)
217+
trace.clear()
218+
219+
# read(0) should do nothing
220+
result = reader.read(0)
221+
assert result == b""
222+
assert trace.total_requests == 0
223+
assert len(reader._cache) == 1 # cache unchanged
224+
225+
226+
def test_parallel_reader_seek_preserves_cache():
227+
"""Seeking should not invalidate the cache."""
228+
memstore = MemoryStore()
229+
memstore.put("test.txt", b"0123456789ABCDEF")
230+
231+
trace = RequestTrace()
232+
traced = TracingReadableStore(memstore, trace)
233+
reader = ParallelStoreReader(traced, "test.txt", chunk_size=4, max_cached_chunks=4)
234+
235+
# Read chunks 0 and 1
236+
reader.read(8)
237+
trace.clear()
238+
239+
# Seek around without reading
240+
reader.seek(0)
241+
reader.seek(100)
242+
reader.seek(4)
243+
reader.seek(0, 2) # SEEK_END
244+
245+
assert trace.total_requests == 0, "Seeking should not make requests"
246+
247+
# Read from cached region
248+
reader.seek(0)
249+
reader.read(8)
250+
assert trace.total_requests == 0, "Cached data should still be available"
251+
252+
253+
def test_parallel_reader_cache_cleared_on_close():
254+
"""Cache should be cleared after close()."""
255+
memstore = MemoryStore()
256+
memstore.put("test.txt", b"0123456789ABCDEF")
257+
258+
reader = ParallelStoreReader(
259+
memstore, "test.txt", chunk_size=4, max_cached_chunks=4
260+
)
261+
262+
reader.read(8) # populate cache
263+
assert len(reader._cache) == 2
264+
265+
reader.close()
266+
assert len(reader._cache) == 0

0 commit comments

Comments
 (0)