File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -95,17 +95,6 @@ ignore =
9595 ; Found too many public instance attributes
9696 WPS230,
9797
98- ; all init files
99- __init__.py:
100- ; ignore not used imports
101- F401,
102- ; ignore import with wildcard
103- F403,
104- ; Found wrong metadata variable
105- WPS410,
106- ; Found commented out cod
107- E800,
108-
10998per-file-ignores =
11099 ; all tests
111100 test_*.py,tests.py,tests_*.py,*/tests/*:
@@ -120,6 +109,21 @@ per-file-ignores =
120109 ; Found complex default value
121110 WPS404,
122111
112+ ; all init files
113+ __init__.py:
114+ ; ignore not used imports
115+ F401,
116+ ; ignore import with wildcard
117+ F403,
118+ ; Found wrong metadata variable
119+ WPS410,
120+ ; Found commented out cod
121+ E800,
122+
123+ taskiq/serialization.py:
124+ ; Found commented out code
125+ E800,
126+
123127exclude =
124128 ./.git,
125129 ./venv,
Original file line number Diff line number Diff line change @@ -86,6 +86,7 @@ def __init__(
8686 "Setting result backend with constructor is deprecated. "
8787 "Please use `with_result_backend` instead." ,
8888 TaskiqDeprecationWarning ,
89+ stacklevel = 2 ,
8990 )
9091 if task_id_generator is None :
9192 task_id_generator = default_id_generator
@@ -94,6 +95,7 @@ def __init__(
9495 "Setting id generator with constructor is deprecated. "
9596 "Please use `with_id_generator` instead." ,
9697 TaskiqDeprecationWarning ,
98+ stacklevel = 2 ,
9799 )
98100 self .middlewares : "List[TaskiqMiddleware]" = []
99101 self .result_backend = result_backend
Original file line number Diff line number Diff line change @@ -206,14 +206,19 @@ async def run_task( # noqa: C901, WPS210
206206 )
207207 dep_ctx = dependency_graph .async_ctx (broker_ctx )
208208 # Resolve all function's dependencies.
209- kwargs = await dep_ctx .resolve_kwargs ()
210-
211- # We udpate kwargs with kwargs from network.
212- kwargs .update (message .kwargs )
213209
214210 # Start a timer.
215211 start_time = time ()
212+
216213 try :
214+ # We put kwargs resolving here,
215+ # to be able to catch any exception (for example ),
216+ # that happen while resolving dependencies.
217+ if dep_ctx :
218+ kwargs = await dep_ctx .resolve_kwargs ()
219+ # We udpate kwargs with kwargs from network.
220+ kwargs .update (message .kwargs )
221+
217222 # If the function is a coroutine, we await it.
218223 if asyncio .iscoroutinefunction (target ):
219224 returned = await target (* message .args , ** kwargs )
Original file line number Diff line number Diff line change 1+ from typing import Generator
2+
13import pytest
24
5+ from taskiq .abc .broker import AsyncBroker
6+
37
48@pytest .fixture (scope = "session" )
59def anyio_backend () -> str :
@@ -10,3 +14,17 @@ def anyio_backend() -> str:
1014 :return: backend name.
1115 """
1216 return "asyncio"
17+
18+
19+ @pytest .fixture (autouse = True )
20+ def reset_broker () -> Generator [None , None , None ]:
21+ """
22+ Restore async broker.
23+
24+ This fixtures sets some global
25+ broker variables to default state.
26+ """
27+ yield
28+ AsyncBroker .available_tasks = {}
29+ AsyncBroker .is_worker_process = False
30+ AsyncBroker .is_scheduler_process = False
Original file line number Diff line number Diff line change @@ -20,3 +20,25 @@ async def task(context: Context = TaskiqDepends()) -> None:
2020 await kicked .wait_result ()
2121
2222 assert runs_count == 2
23+
24+
25+ @pytest .mark .anyio
26+ async def test_requeue_from_dependency () -> None :
27+ broker = InMemoryBroker ()
28+
29+ runs_count = 0
30+
31+ async def dep_func (context : Context = TaskiqDepends ()) -> None :
32+ nonlocal runs_count
33+ runs_count += 1
34+ if runs_count < 2 :
35+ await context .requeue ()
36+
37+ @broker .task
38+ async def task (_ : None = TaskiqDepends (dep_func )) -> None :
39+ return None
40+
41+ kicked = await task .kiq ()
42+ await kicked .wait_result ()
43+
44+ assert runs_count == 2
You can’t perform that action at this time.
0 commit comments