Skip to content

Commit e4cf262

Browse files
authored
Fix!: Always use head for file size determination (#39)
1 parent eadc322 commit e4cf262

12 files changed

Lines changed: 347 additions & 203 deletions

src/obspec_utils/aiohttp.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,5 +571,61 @@ def get_ranges(
571571
self.get_ranges_async(path, starts=starts, ends=ends, lengths=lengths)
572572
)
573573

574+
# --- Head methods ---
575+
576+
async def _do_head_async(
577+
self,
578+
session: aiohttp.ClientSession,
579+
path: str,
580+
) -> ObjectMeta:
581+
"""Internal method that performs the actual HEAD request."""
582+
url = self._build_url(path)
583+
request_headers = {} if self._session else dict(self.headers)
584+
585+
async with session.head(url, headers=request_headers) as response:
586+
response.raise_for_status()
587+
return self._parse_meta_from_headers(path, dict(response.headers))
588+
589+
async def head_async(self, path: str) -> ObjectMeta:
590+
"""
591+
Get file metadata asynchronously via HEAD request.
592+
593+
Parameters
594+
----------
595+
path
596+
Path to the file relative to base_url.
597+
598+
Returns
599+
-------
600+
ObjectMeta
601+
File metadata including size, last_modified, e_tag, etc.
602+
"""
603+
if self._session is not None:
604+
return await self._do_head_async(self._session, path)
605+
606+
# Fallback: create a temporary session for this request
607+
async with aiohttp.ClientSession(
608+
timeout=self.timeout, headers=self.headers
609+
) as session:
610+
return await self._do_head_async(session, path)
611+
612+
def head(self, path: str) -> ObjectMeta:
613+
"""
614+
Get file metadata synchronously via HEAD request.
615+
616+
This wraps the async implementation for convenience.
617+
618+
Parameters
619+
----------
620+
path
621+
Path to the file relative to base_url.
622+
623+
Returns
624+
-------
625+
ObjectMeta
626+
File metadata including size, last_modified, e_tag, etc.
627+
"""
628+
return asyncio.run(self.head_async(path))
629+
574630

575631
__all__ = ["AiohttpStore", "AiohttpGetResult", "AiohttpGetResultAsync"]

src/obspec_utils/cache.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
if TYPE_CHECKING:
1919
from collections.abc import Buffer
2020

21-
from obspec import GetOptions, GetResult, GetResultAsync
21+
from obspec import GetOptions, GetResult, GetResultAsync, ObjectMeta
2222

2323

2424
class CachingReadableStore(ReadableStore):
@@ -88,7 +88,8 @@ def __init__(self, store: ReadableStore, max_size: int = 256 * 1024 * 1024) -> N
8888
Any object implementing the full read interface: [Get][obspec.Get],
8989
[GetAsync][obspec.GetAsync], [GetRange][obspec.GetRange],
9090
[GetRangeAsync][obspec.GetRangeAsync], [GetRanges][obspec.GetRanges],
91-
and [GetRangesAsync][obspec.GetRangesAsync].
91+
[GetRangesAsync][obspec.GetRangesAsync], [Head][obspec.Head],
92+
and [HeadAsync][obspec.HeadAsync].
9293
max_size
9394
Maximum cache size in bytes. Default: 256 MB.
9495
"""
@@ -281,5 +282,13 @@ async def get_ranges_async(
281282
path, starts=starts, ends=ends, lengths=lengths
282283
)
283284

285+
def head(self, path: str) -> ObjectMeta:
286+
"""Get file metadata (delegates to underlying store)."""
287+
return self._store.head(path)
288+
289+
async def head_async(self, path: str) -> ObjectMeta:
290+
"""Get file metadata async (delegates to underlying store)."""
291+
return await self._store.head_async(path)
292+
284293

285294
__all__ = ["CachingReadableStore"]

src/obspec_utils/obspec.py

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,22 @@
1111
GetRangeAsync,
1212
GetRanges,
1313
GetRangesAsync,
14+
Head,
15+
HeadAsync,
1416
)
1517

1618

1719
@runtime_checkable
1820
class ReadableStore(
19-
Get, GetAsync, GetRange, GetRangeAsync, GetRanges, GetRangesAsync, Protocol
21+
Get,
22+
GetAsync,
23+
GetRange,
24+
GetRangeAsync,
25+
GetRanges,
26+
GetRangesAsync,
27+
Head,
28+
HeadAsync,
29+
Protocol,
2030
):
2131
"""
2232
Full read interface for transparent store wrappers.
@@ -31,6 +41,7 @@ class ReadableStore(
3141
- [Get][obspec.Get] / [GetAsync][obspec.GetAsync]: Download entire files
3242
- [GetRange][obspec.GetRange] / [GetRangeAsync][obspec.GetRangeAsync]: Download byte ranges
3343
- [GetRanges][obspec.GetRanges] / [GetRangesAsync][obspec.GetRangesAsync]: Download multiple ranges
44+
- [Head][obspec.Head] / [HeadAsync][obspec.HeadAsync]: Get file metadata (size, etag, etc.)
3445
3546
Note: This is a flat composition of obspec protocols, not a hierarchical tier.
3647
For parsers with specific requirements, compose your own protocols directly
@@ -166,11 +177,12 @@ class BufferedStoreReader:
166177
[ParallelStoreReader][obspec_utils.obspec.ParallelStoreReader] : Uses parallel requests with LRU caching for sparse access.
167178
"""
168179

169-
class Store(Get, GetRange, Protocol):
180+
class Store(Get, GetRange, Head, Protocol):
170181
"""
171182
Store protocol required by BufferedStoreReader.
172183
173-
Combines [Get][obspec.Get] and [GetRange][obspec.GetRange] from obspec.
184+
Combines [Get][obspec.Get], [GetRange][obspec.GetRange], and
185+
[Head][obspec.Head] from obspec.
174186
"""
175187

176188
pass
@@ -204,10 +216,9 @@ def __init__(
204216
self._buffer_start = 0
205217

206218
def _get_size(self) -> int:
207-
"""Lazily fetch the file size via a get() call."""
219+
"""Lazily fetch the file size via a head() call."""
208220
if self._size is None:
209-
result = self._store.get(self._path)
210-
self._size = result.meta["size"]
221+
self._size = self._store.head(self._path)["size"]
211222
return self._size
212223

213224
def read(self, size: int = -1, /) -> bytes:
@@ -345,8 +356,7 @@ class EagerStoreReader:
345356
By default, the file is fetched using parallel range requests via
346357
`get_ranges()`, which can significantly improve load time for large files.
347358
The defaults (12 MB request size, max 18 concurrent requests) are tuned for
348-
cloud storage. If the store supports the `Head` protocol, the file size
349-
will be determined automatically via a HEAD request.
359+
cloud storage. The file size is determined automatically via a HEAD request.
350360
351361
The parallel fetching strategy is based on Icechunk's approach:
352362
https://github.com/earth-mover/icechunk/blob/main/icechunk/src/storage/mod.rs
@@ -376,13 +386,12 @@ class EagerStoreReader:
376386
[ParallelStoreReader][obspec_utils.obspec.ParallelStoreReader] : Uses parallel requests with LRU caching for sparse access.
377387
"""
378388

379-
class Store(Get, GetRanges, Protocol):
389+
class Store(Get, GetRanges, Head, Protocol):
380390
"""
381391
Store protocol required by EagerStoreReader.
382392
383-
Combines [Get][obspec.Get] and [GetRanges][obspec.GetRanges] from obspec.
384-
Optionally, the store may implement [Head][obspec.Head] for automatic
385-
file size detection.
393+
Combines [Get][obspec.Get], [GetRanges][obspec.GetRanges], and
394+
[Head][obspec.Head] from obspec.
386395
"""
387396

388397
pass
@@ -403,18 +412,18 @@ def __init__(
403412
Parameters
404413
----------
405414
store
406-
Any object implementing [Get][obspec.Get] and [GetRanges][obspec.GetRanges].
407-
Optionally implements [Head][obspec.Head] for automatic file size detection.
415+
Any object implementing [Get][obspec.Get], [GetRanges][obspec.GetRanges],
416+
and [Head][obspec.Head].
408417
path
409418
The path to the file within the store.
410419
request_size
411420
Target size for each parallel range request in bytes. Default is 12 MB,
412421
tuned for cloud storage throughput. The file will be divided into
413422
parts of this size and fetched using `get_ranges()`.
414423
file_size
415-
File size in bytes. If not provided, the reader will attempt to get
416-
the size via `store.head()` if the store supports [Head][obspec.Head].
417-
If the size cannot be determined, falls back to a single `get()` request.
424+
File size in bytes. If not provided, the size is determined via
425+
`store.head()`. Pass this to skip the HEAD request if you already
426+
know the file size.
418427
max_concurrent_requests
419428
Maximum number of parallel range requests. Default is 18. If the file
420429
would require more requests than this, request sizes are increased to
@@ -425,14 +434,7 @@ def __init__(
425434

426435
# Determine file size if not provided
427436
if file_size is None:
428-
if hasattr(store, "head") and callable(store.head):
429-
file_size = store.head(path)["size"]
430-
else:
431-
# Fall back to single request if we can't determine size
432-
result = store.get(path)
433-
data = bytes(result.buffer())
434-
self._buffer = io.BytesIO(data)
435-
return
437+
file_size = store.head(path)["size"]
436438

437439
# Handle empty files
438440
if file_size == 0:
@@ -538,11 +540,12 @@ class ParallelStoreReader:
538540
[EagerStoreReader][obspec_utils.obspec.EagerStoreReader] : Loads entire file into memory for fast random access.
539541
"""
540542

541-
class Store(Get, GetRanges, Protocol):
543+
class Store(Get, GetRanges, Head, Protocol):
542544
"""
543545
Store protocol required by ParallelStoreReader.
544546
545-
Combines [Get][obspec.Get] and [GetRanges][obspec.GetRanges] from obspec.
547+
Combines [Get][obspec.Get], [GetRanges][obspec.GetRanges], and
548+
[Head][obspec.Head] from obspec.
546549
"""
547550

548551
pass
@@ -581,10 +584,9 @@ def __init__(
581584
self._cache: OrderedDict[int, bytes] = OrderedDict()
582585

583586
def _get_size(self) -> int:
584-
"""Lazily fetch the file size via a get() call."""
587+
"""Lazily fetch the file size via a head() call."""
585588
if self._size is None:
586-
result = self._store.get(self._path)
587-
self._size = result.meta["size"]
589+
self._size = self._store.head(self._path)["size"]
588590
return self._size
589591

590592
def _get_chunks(self, chunk_indices: list[int]) -> dict[int, bytes]:

src/obspec_utils/splitting.py

Lines changed: 33 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
if TYPE_CHECKING:
1717
from collections.abc import Buffer
1818

19-
from obspec import GetOptions, GetResult, GetResultAsync
19+
from obspec import GetOptions, GetResult, GetResultAsync, ObjectMeta
2020

2121

2222
class SplittingReadableStore(ReadableStore):
@@ -62,10 +62,6 @@ class SplittingReadableStore(ReadableStore):
6262
The parallel fetching strategy is based on Icechunk's approach:
6363
https://github.com/earth-mover/icechunk/blob/main/icechunk/src/storage/mod.rs
6464
65-
**File size detection**: The wrapper attempts to determine file size via
66-
head() if the store supports it. If not available, it falls back to a
67-
single get() request (no splitting).
68-
6965
Examples
7066
--------
7167
Basic usage:
@@ -123,7 +119,8 @@ def __init__(
123119
Any object implementing the full read interface: [Get][obspec.Get],
124120
[GetAsync][obspec.GetAsync], [GetRange][obspec.GetRange],
125121
[GetRangeAsync][obspec.GetRangeAsync], [GetRanges][obspec.GetRanges],
126-
and [GetRangesAsync][obspec.GetRangesAsync].
122+
[GetRangesAsync][obspec.GetRangesAsync], [Head][obspec.Head],
123+
and [HeadAsync][obspec.HeadAsync].
127124
request_size
128125
Target size for each parallel range request. Default: 12 MB.
129126
max_concurrent_requests
@@ -151,26 +148,6 @@ def __getattr__(self, name: str) -> Any:
151148
)
152149
return getattr(self._store, name)
153150

154-
def _get_file_size(self, path: str) -> int | None:
155-
"""Try to get file size via head(), return None if not available."""
156-
if hasattr(self._store, "head") and callable(self._store.head):
157-
try:
158-
return self._store.head(path)["size"]
159-
except Exception:
160-
return None
161-
return None
162-
163-
async def _get_file_size_async(self, path: str) -> int | None:
164-
"""Async version of _get_file_size."""
165-
if hasattr(self._store, "head_async") and callable(self._store.head_async):
166-
try:
167-
result = await self._store.head_async(path)
168-
return result["size"]
169-
except Exception:
170-
return None
171-
# Fall back to sync head if available
172-
return self._get_file_size(path)
173-
174151
def _compute_ranges(self, file_size: int) -> tuple[list[int], list[int]] | None:
175152
"""Compute start positions and lengths for parallel fetching.
176153
@@ -216,40 +193,37 @@ async def _wrap_as_get_result_async(self, path: str, data: bytes) -> GetResultAs
216193
def get(self, path: str, *, options: GetOptions | None = None) -> GetResult:
217194
"""Get file, using parallel fetching if beneficial.
218195
219-
If the file size can be determined and the file is large enough to
220-
benefit from splitting, fetches via parallel get_ranges(). Otherwise
221-
falls back to a single get() request.
196+
If the file is large enough to benefit from splitting, fetches via
197+
parallel get_ranges(). Otherwise falls back to a single get() request.
222198
"""
223-
file_size = self._get_file_size(path)
199+
file_size = self.head(path)["size"]
200+
ranges = self._compute_ranges(file_size)
224201

225-
if file_size is not None:
226-
ranges = self._compute_ranges(file_size)
227-
if ranges is not None:
228-
starts, lengths = ranges
229-
results = self._store.get_ranges(path, starts=starts, lengths=lengths)
230-
data = b"".join(bytes(part) for part in results)
231-
return self._wrap_as_get_result(path, data)
202+
if ranges is not None:
203+
starts, lengths = ranges
204+
results = self._store.get_ranges(path, starts=starts, lengths=lengths)
205+
data = b"".join(bytes(part) for part in results)
206+
return self._wrap_as_get_result(path, data)
232207

233-
# Fall back to regular get
208+
# Fall back to regular get (file too small for splitting)
234209
return self._store.get(path, options=options)
235210

236211
async def get_async(
237212
self, path: str, *, options: GetOptions | None = None
238213
) -> GetResultAsync:
239214
"""Async get, using parallel fetching if beneficial."""
240-
file_size = await self._get_file_size_async(path)
241-
242-
if file_size is not None:
243-
ranges = self._compute_ranges(file_size)
244-
if ranges is not None:
245-
starts, lengths = ranges
246-
results = await self._store.get_ranges_async(
247-
path, starts=starts, lengths=lengths
248-
)
249-
data = b"".join(bytes(part) for part in results)
250-
return await self._wrap_as_get_result_async(path, data)
251-
252-
# Fall back to regular get_async
215+
file_size = (await self.head_async(path))["size"]
216+
ranges = self._compute_ranges(file_size)
217+
218+
if ranges is not None:
219+
starts, lengths = ranges
220+
results = await self._store.get_ranges_async(
221+
path, starts=starts, lengths=lengths
222+
)
223+
data = b"".join(bytes(part) for part in results)
224+
return await self._wrap_as_get_result_async(path, data)
225+
226+
# Fall back to regular get_async (file too small for splitting)
253227
return await self._store.get_async(path, options=options)
254228

255229
# Pass through range methods unchanged - caller already sized appropriately
@@ -302,5 +276,13 @@ async def get_ranges_async(
302276
path, starts=starts, ends=ends, lengths=lengths
303277
)
304278

279+
def head(self, path: str) -> ObjectMeta:
280+
"""Get file metadata (delegates to underlying store)."""
281+
return self._store.head(path)
282+
283+
async def head_async(self, path: str) -> ObjectMeta:
284+
"""Get file metadata async (delegates to underlying store)."""
285+
return await self._store.head_async(path)
286+
305287

306288
__all__ = ["SplittingReadableStore"]

0 commit comments

Comments
 (0)