@@ -117,7 +117,6 @@ def __init__(
117117 self ,
118118 url : str ,
119119 prefix : str = "schedule" ,
120- buffer_size : int = 50 ,
121120 serializer : Optional [TaskiqSerializer ] = None ,
122121 ** connection_kwargs : Any ,
123122 ) -> None :
@@ -126,7 +125,6 @@ def __init__(
126125 url ,
127126 ** connection_kwargs ,
128127 )
129- self .buffer_size = buffer_size
130128 if serializer is None :
131129 serializer = PickleSerializer ()
132130 self .serializer = serializer
@@ -156,19 +154,14 @@ async def get_schedules(self) -> List[ScheduledTask]:
156154 :return: list of schedules.
157155 """
158156 schedules = []
159- buffer = []
160157 async for key in self .redis .scan_iter (f"{ self .prefix } :*" ): # type: ignore[attr-defined]
161- buffer .append (key )
162- if len (buffer ) >= self .buffer_size :
163- schedules .extend (await self .redis .mget (buffer )) # type: ignore[attr-defined]
164- buffer = []
165- if buffer :
166- schedules .extend (await self .redis .mget (buffer )) # type: ignore[attr-defined]
167- return [
168- model_validate (ScheduledTask , self .serializer .loadb (schedule ))
169- for schedule in schedules
170- if schedule
171- ]
158+ raw_schedule = await self .redis .get (key ) # type: ignore[attr-defined]
159+ parsed_schedule = model_validate (
160+ ScheduledTask ,
161+ self .serializer .loadb (raw_schedule ),
162+ )
163+ schedules .append (parsed_schedule )
164+ return schedules
172165
173166 async def post_send (self , task : ScheduledTask ) -> None :
174167 """Delete a task after it's completed."""
0 commit comments