Skip to content

Commit 7639c88

Browse files
committed
Merge branch 'release/3.3.2'
2 parents 02a2237 + 7f0dde9 commit 7639c88

3 files changed

Lines changed: 56 additions & 16 deletions

File tree

_python_utils_tests/test_generators.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import pytest
24

35
import python_utils
@@ -16,12 +18,42 @@ async def test_abatcher():
1618
async def test_abatcher_timed():
1719
batches = []
1820
async for batch in python_utils.abatcher(
19-
python_utils.acount(stop=10, delay=0.08), interval=0.2
21+
python_utils.acount(stop=10, delay=0.08), interval=0.1
2022
):
2123
batches.append(batch)
2224

23-
assert len(batches) == 3
24-
assert sum(len(batch) for batch in batches) == 10
25+
assert batches == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]
26+
assert len(batches) == 5
27+
28+
29+
@pytest.mark.asyncio
30+
async def test_abatcher_timed_with_timeout():
31+
async def generator():
32+
# Test if the timeout is respected
33+
yield 0
34+
yield 1
35+
await asyncio.sleep(0.11)
36+
37+
# Test if the timeout is respected
38+
yield 2
39+
yield 3
40+
await asyncio.sleep(0.11)
41+
42+
# Test if exceptions are handled correctly
43+
await asyncio.wait_for(asyncio.sleep(1), timeout=0.05)
44+
45+
# Test if StopAsyncIteration is handled correctly
46+
yield 4
47+
48+
batcher = python_utils.abatcher(generator(), interval=0.1)
49+
assert await batcher.__anext__() == [0, 1]
50+
assert await batcher.__anext__() == [2, 3]
51+
52+
with pytest.raises(asyncio.TimeoutError):
53+
await batcher.__anext__()
54+
55+
with pytest.raises(StopAsyncIteration):
56+
await batcher.__anext__()
2557

2658

2759
def test_batcher():

python_utils/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
)
88
__url__: str = 'https://github.com/WoLpH/python-utils'
99
# Omit type info due to automatic versioning script
10-
__version__ = '3.3.1'
10+
__version__ = '3.3.2'

python_utils/generators.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,35 @@ async def abatcher(
1818

1919
assert batch_size or interval, 'Must specify either batch_size or interval'
2020

21+
# If interval is specified, use it to determine when to yield the batch
22+
# Alternatively set a really long timeout to keep the code simpler
2123
if interval:
2224
interval_s = python_utils.delta_to_seconds(interval)
23-
next_yield = time.perf_counter() + interval_s
2425
else:
25-
interval_s = 0
26-
next_yield = 0
26+
# Set the timeout to 10 years
27+
interval_s = 60 * 60 * 24 * 365 * 10.0
28+
29+
next_yield = time.perf_counter() + interval_s
30+
31+
pending: types.Set = set()
2732

2833
while True:
2934
try:
30-
if interval_s:
31-
item = await asyncio.wait_for(
32-
generator.__anext__(), interval_s
33-
)
34-
else:
35-
item = await generator.__anext__()
36-
except (StopAsyncIteration, asyncio.TimeoutError):
35+
done, pending = await asyncio.wait(
36+
pending or [generator.__anext__()],
37+
timeout=interval_s,
38+
return_when=asyncio.FIRST_COMPLETED,
39+
)
40+
41+
if done:
42+
for result in done:
43+
batch.append(result.result())
44+
45+
except StopAsyncIteration:
3746
if batch:
3847
yield batch
48+
3949
break
40-
else:
41-
batch.append(item)
4250

4351
if batch_size is not None and len(batch) == batch_size:
4452
yield batch

0 commit comments

Comments
 (0)