Skip to content

Commit a3da9ce

Browse files
authored
refactor: Per-store event loop for AiohttpStore (#59)
1 parent 43174a1 commit a3da9ce

2 files changed

Lines changed: 137 additions & 6 deletions

File tree

src/obspec_utils/stores/_aiohttp.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
from __future__ import annotations
2727

2828
import asyncio
29-
from collections.abc import AsyncIterator, Iterator, Sequence
29+
import threading
30+
from collections.abc import AsyncIterator, Coroutine, Iterator, Sequence
3031
from dataclasses import dataclass, field
3132
from datetime import datetime, timezone
32-
from typing import TYPE_CHECKING
33+
from typing import TYPE_CHECKING, TypeVar
3334

3435
from obspec import GetResult, GetResultAsync
3536

@@ -38,6 +39,8 @@
3839
if TYPE_CHECKING:
3940
from obspec import Attributes, GetOptions, ObjectMeta
4041

42+
T = TypeVar("T")
43+
4144
try:
4245
import aiohttp
4346
except ImportError as e:
@@ -196,6 +199,10 @@ def __init__(
196199
self.headers = headers or {}
197200
self.timeout = aiohttp.ClientTimeout(total=timeout)
198201
self._session: aiohttp.ClientSession | None = None
202+
# Event loop for sync methods when called from async context (e.g., Jupyter)
203+
self._sync_loop: asyncio.AbstractEventLoop | None = None
204+
self._sync_thread: threading.Thread | None = None
205+
self._sync_lock = threading.Lock()
199206

200207
async def __aenter__(self) -> "AiohttpStore":
201208
"""Enter the async context manager, creating a reusable session."""
@@ -211,6 +218,52 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
211218
await self._session.close()
212219
self._session = None
213220

221+
def _get_sync_loop(self) -> asyncio.AbstractEventLoop:
222+
"""Get or create event loop for sync operations when inside a running loop."""
223+
if self._sync_loop is None:
224+
with self._sync_lock:
225+
if self._sync_loop is None:
226+
loop = asyncio.new_event_loop()
227+
thread = threading.Thread(
228+
target=loop.run_forever,
229+
name=f"aiohttp_store_{id(self)}",
230+
daemon=True,
231+
)
232+
thread.start()
233+
self._sync_loop = loop
234+
self._sync_thread = thread
235+
return self._sync_loop
236+
237+
def _run_sync(self, coro: Coroutine[None, None, T]) -> T:
238+
"""Run coroutine synchronously, handling nested event loops (e.g., Jupyter)."""
239+
try:
240+
asyncio.get_running_loop()
241+
except RuntimeError:
242+
# No running loop - use asyncio.run() directly
243+
return asyncio.run(coro)
244+
245+
# Inside running loop - use store's dedicated loop
246+
loop = self._get_sync_loop()
247+
future = asyncio.run_coroutine_threadsafe(coro, loop)
248+
return future.result()
249+
250+
def _cleanup_sync_loop(self) -> None:
251+
"""Stop the sync loop and thread."""
252+
if self._sync_loop is not None:
253+
self._sync_loop.call_soon_threadsafe(self._sync_loop.stop)
254+
if self._sync_thread is not None:
255+
self._sync_thread.join(timeout=1.0)
256+
self._sync_loop = None
257+
self._sync_thread = None
258+
259+
def close(self) -> None:
260+
"""Close the store and release resources."""
261+
self._cleanup_sync_loop()
262+
263+
def __del__(self) -> None:
264+
"""Clean up on garbage collection."""
265+
self._cleanup_sync_loop()
266+
214267
def _build_url(self, path: str) -> str:
215268
"""Build the full URL from base URL and path."""
216269
path = path.removeprefix("/")
@@ -497,7 +550,7 @@ def get(
497550
AiohttpGetResult
498551
Result object with buffer() method and metadata.
499552
"""
500-
result = asyncio.run(self.get_async(path, options=options))
553+
result = self._run_sync(self.get_async(path, options=options))
501554
return AiohttpGetResult(
502555
_data=result._data,
503556
_meta=result._meta,
@@ -534,7 +587,7 @@ def get_range(
534587
bytes
535588
The requested byte range.
536589
"""
537-
return asyncio.run(
590+
return self._run_sync(
538591
self.get_range_async(path, start=start, end=end, length=length)
539592
)
540593

@@ -567,7 +620,7 @@ def get_ranges(
567620
Sequence[bytes]
568621
The requested byte ranges.
569622
"""
570-
return asyncio.run(
623+
return self._run_sync(
571624
self.get_ranges_async(path, starts=starts, ends=ends, lengths=lengths)
572625
)
573626

@@ -627,7 +680,7 @@ def head(self, path: str) -> ObjectMeta:
627680
ObjectMeta
628681
File metadata including size, last_modified, e_tag, etc.
629682
"""
630-
return asyncio.run(self.head_async(path))
683+
return self._run_sync(self.head_async(path))
631684

632685

633686
__all__ = ["AiohttpStore", "AiohttpGetResult", "AiohttpGetResultAsync"]

tests/test_aiohttp.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,81 @@ def test_head_sync(minio_test_file):
657657
assert meta["path"] == minio_test_file["path"]
658658
assert meta["e_tag"] is not None
659659
assert meta["last_modified"] is not None
660+
661+
662+
# --- Nested Event Loop Handling (Jupyter compatibility) ---
663+
664+
665+
@requires_minio
666+
@pytest.mark.asyncio
667+
async def test_sync_methods_from_running_loop(minio_test_file):
668+
"""
669+
Sync methods work when called from within a running event loop.
670+
671+
This simulates the Jupyter notebook environment where an event loop
672+
is already running. The per-store event loop design handles this by
673+
creating a dedicated thread with its own event loop for sync operations.
674+
"""
675+
store = AiohttpStore(minio_test_file["base_url"])
676+
677+
# We're inside an async function, so there's a running event loop.
678+
# Calling sync methods would fail with asyncio.run() but should
679+
# work with the per-store event loop implementation.
680+
try:
681+
# Test head (sync)
682+
meta = store.head(minio_test_file["path"])
683+
assert meta["size"] == len(minio_test_file["content"])
684+
685+
# Test get (sync)
686+
result = store.get(minio_test_file["path"])
687+
assert result.buffer() == minio_test_file["content"]
688+
689+
# Test get_range (sync)
690+
data = store.get_range(minio_test_file["path"], start=0, length=5)
691+
assert bytes(data) == b"01234"
692+
693+
# Test get_ranges (sync)
694+
results = store.get_ranges(
695+
minio_test_file["path"], starts=[0, 10], lengths=[5, 6]
696+
)
697+
assert [bytes(r) for r in results] == [b"01234", b"ABCDEF"]
698+
699+
finally:
700+
store.close()
701+
702+
703+
def test_sync_loop_not_created_outside_async():
704+
"""Sync loop is not created when not inside a running event loop."""
705+
store = AiohttpStore("https://example.com")
706+
707+
# Before any sync call
708+
assert store._sync_loop is None
709+
assert store._sync_thread is None
710+
711+
# close() should be safe even if loop was never created
712+
store.close()
713+
assert store._sync_loop is None
714+
715+
716+
@requires_minio
717+
@pytest.mark.asyncio
718+
async def test_sync_loop_created_inside_async(minio_test_file):
719+
"""Sync loop is lazily created when sync method called from async context."""
720+
store = AiohttpStore(minio_test_file["base_url"])
721+
722+
# Before sync call
723+
assert store._sync_loop is None
724+
assert store._sync_thread is None
725+
726+
# Call sync method from async context
727+
_ = store.head(minio_test_file["path"])
728+
729+
# Sync loop should now exist
730+
assert store._sync_loop is not None
731+
assert store._sync_thread is not None
732+
assert store._sync_thread.is_alive()
733+
734+
# Cleanup
735+
store.close()
736+
assert store._sync_loop is None
737+
assert store._sync_thread is None

0 commit comments

Comments
 (0)