11import asyncio
22from datetime import datetime , timedelta
33from logging import basicConfig , getLevelName , getLogger
4- from typing import List
4+ from typing import List , Optional
55
66import pytz
77from pycron import is_now
@@ -64,15 +64,15 @@ async def schedules_updater(
6464 await asyncio .sleep (scheduler .refresh_delay )
6565
6666
67- def should_run (task : ScheduledTask ) -> bool :
67+ def get_task_delay (task : ScheduledTask ) -> Optional [ int ]: # noqa: C901
6868 """
69- Checks if it's time to run a task .
69+ Get delay of the task in seconds .
7070
7171 :param task: task to check.
7272 :return: True if task must be sent.
7373 """
74+ now = datetime .now (tz = pytz .UTC )
7475 if task .cron is not None :
75- now = datetime .now (tz = pytz .UTC )
7676 # If user specified cron offset we apply it.
7777 # If it's timedelta, we simply add the delta to current time.
7878 if task .cron_offset and isinstance (task .cron_offset , timedelta ):
@@ -81,10 +81,43 @@ def should_run(task: ScheduledTask) -> bool:
8181 # offset and then apply.
8282 elif task .cron_offset and isinstance (task .cron_offset , str ):
8383 now = now .astimezone (pytz .timezone (task .cron_offset ))
84- return is_now (task .cron , now )
85- if task .time is not None :
86- return to_tz_aware (task .time ) <= datetime .now (tz = pytz .UTC )
87- return False
84+ if is_now (task .cron , now ):
85+ return 0
86+ return None
87+ elif task .time is not None :
88+ task_time = to_tz_aware (task .time ).replace (microsecond = 0 )
89+ if task_time <= now :
90+ return 0
91+ one_min_ahead = (now + timedelta (minutes = 1 )).replace (second = 1 , microsecond = 0 )
92+ if task_time <= one_min_ahead :
93+ return int ((task_time - now ).total_seconds ())
94+ return None
95+
96+
97+ async def delayed_send (
98+ scheduler : TaskiqScheduler ,
99+ task : ScheduledTask ,
100+ delay : int ,
101+ ) -> None :
102+ """
103+ Send a task with a delay.
104+
105+ This function waits for some time and then
106+ sends a task.
107+
108+ The main idea is that scheduler gathers
109+ tasks every minute and some of them have
110+ specfic time. To respect the time, we calculate
111+ the delay and send the task after some delay.
112+
113+ :param scheduler: current scheduler.
114+ :param task: task to send.
115+ :param delay: how long to wait.
116+ """
117+ if delay > 0 :
118+ await asyncio .sleep (delay )
119+ logger .info ("Sending task %s." , task .task_name )
120+ await scheduler .on_ready (task )
88121
89122
90123async def _run_loop (scheduler : TaskiqScheduler ) -> None :
@@ -105,17 +138,16 @@ async def _run_loop(scheduler: TaskiqScheduler) -> None:
105138 while True : # noqa: WPS457
106139 for task in tasks :
107140 try :
108- ready = should_run (task )
141+ task_delay = get_task_delay (task )
109142 except ValueError :
110143 logger .warning (
111144 "Cannot parse cron: %s for task: %s" ,
112145 task .cron ,
113146 task .task_name ,
114147 )
115148 continue
116- if ready :
117- logger .info ("Sending task %s." , task .task_name )
118- loop .create_task (scheduler .on_ready (task ))
149+ if task_delay is not None :
150+ loop .create_task (delayed_send (scheduler , task , task_delay ))
119151
120152 delay = (
121153 datetime .now ().replace (second = 1 , microsecond = 0 )
0 commit comments