|
3 | 3 | from logging import basicConfig, getLevelName, getLogger |
4 | 4 | from typing import List |
5 | 5 |
|
| 6 | +import pytz |
6 | 7 | from pycron import is_now |
7 | 8 |
|
8 | 9 | from taskiq.cli.scheduler.args import SchedulerArgs |
|
12 | 13 | logger = getLogger(__name__) |
13 | 14 |
|
14 | 15 |
|
| 16 | +def to_tz_aware(time: datetime) -> datetime: |
| 17 | + """ |
| 18 | + Convert datetime to timezone aware. |
| 19 | +
|
| 20 | + This function takes a datetime and if |
| 21 | + timezone was not yet specified, it will |
| 22 | + be set to UTC. |
| 23 | +
|
| 24 | + :param time: time to make timezone aware. |
| 25 | + :return: timezone aware time. |
| 26 | + """ |
| 27 | + if time.tzinfo is None: |
| 28 | + return time.replace(tzinfo=pytz.UTC) |
| 29 | + return time |
| 30 | + |
| 31 | + |
15 | 32 | async def schedules_updater( |
16 | 33 | scheduler: TaskiqScheduler, |
17 | 34 | current_schedules: List[ScheduledTask], |
@@ -55,9 +72,18 @@ def should_run(task: ScheduledTask) -> bool: |
55 | 72 | :return: True if task must be sent. |
56 | 73 | """ |
57 | 74 | if task.cron is not None: |
58 | | - return is_now(task.cron, datetime.utcnow()) |
| 75 | + now = datetime.now(tz=pytz.UTC) |
| 76 | + # If user specified cron offset we apply it. |
| 77 | + # If it's timedelta, we simply add the delta to current time. |
| 78 | + if task.cron_offset and isinstance(task.cron_offset, timedelta): |
| 79 | + now += task.cron_offset |
| 80 | + # If timezone was specified as string we convert it timzone |
| 81 | + # offset and then apply. |
| 82 | + elif task.cron_offset and isinstance(task.cron_offset, str): |
| 83 | + now = now.astimezone(pytz.timezone(task.cron_offset)) |
| 84 | + return is_now(task.cron, now) |
59 | 85 | if task.time is not None: |
60 | | - return task.time <= datetime.utcnow() |
| 86 | + return to_tz_aware(task.time) <= datetime.now(tz=pytz.UTC) |
61 | 87 | return False |
62 | 88 |
|
63 | 89 |
|
|
0 commit comments