This repository was archived by the owner on Feb 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask_scheduler.py
135 lines (114 loc) · 4.81 KB
/
task_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import asyncio
import datetime
import logging
from typing import Callable
import sentry_sdk
from nng_sdk.postgres.exceptions import ItemNotFoundException
from nng_sdk.pydantic_models.startup import Startup
from background_tasks.base_task import BaseTask
class TaskScheduler(BaseTask):
def __init__(self):
super().__init__()
self.by_time_tasks = []
self.interval_tasks = []
def add_by_time_task(
self, task_name: str, execution_time: str, task_func: Callable
):
self.by_time_tasks.append(
(task_name, datetime.datetime.strptime(execution_time, "%H:%M"), task_func)
)
def add_interval_task(
self,
task_name: str,
interval_days: int,
task_func: Callable,
run_at_startup: bool = True,
):
self.interval_tasks.append(
(task_name, interval_days, task_func, run_at_startup)
)
def get_startup_or_none(self, task_name: str) -> datetime.datetime | None:
try:
return self.postgres.startup.get_startup_for_service(task_name).time_log
except ItemNotFoundException:
return None
async def run(self):
tasks_and_target_datetime = {}
tasks_and_callables = {}
tasks_and_should_log = {}
now = datetime.datetime.now()
for task_name, execution_time, task_func in self.by_time_tasks:
next_execution_time = datetime.datetime(
now.year, now.month, now.day, execution_time.hour, execution_time.minute
)
delta = next_execution_time - now
if delta <= datetime.timedelta(0):
tasks_and_target_datetime[task_name] = (
next_execution_time + datetime.timedelta(days=1)
)
else:
tasks_and_target_datetime[task_name] = datetime.datetime(
now.year,
now.month,
now.day,
execution_time.hour,
execution_time.minute,
)
tasks_and_callables[task_name] = task_func
tasks_and_should_log[task_name] = False
for index, (task_name, interval_days, task_func, run_at_startup) in enumerate(
self.interval_tasks
):
time_log: datetime.datetime | None = self.get_startup_or_none(task_name)
if time_log:
target_datetime = time_log + datetime.timedelta(days=interval_days)
if target_datetime < now:
target_datetime = now + datetime.timedelta(days=interval_days)
tasks_and_target_datetime[task_name] = target_datetime
elif run_at_startup:
tasks_and_target_datetime[task_name] = now
else:
tasks_and_target_datetime[task_name] = now + datetime.timedelta(
days=interval_days
)
tasks_and_callables[task_name] = task_func
tasks_and_should_log[task_name] = True
self.interval_tasks[index] = (task_name, interval_days, task_func, False)
tasks_and_target_datetime = dict(
sorted(tasks_and_target_datetime.items(), key=lambda item: item[1])
)
for task_name, target_datetime in tasks_and_target_datetime.items():
delta_time = target_datetime - datetime.datetime.now()
if delta_time.total_seconds() <= 0:
logging.info(f"запускаю задачу {task_name}")
else:
logging.info(
f"следующая задача {task_name} запустится {target_datetime.strftime('%d.%m.%Y %H:%M')}"
)
await asyncio.sleep(
(target_datetime - datetime.datetime.now()).total_seconds()
)
try:
self._run_task(
task_name,
tasks_and_callables[task_name],
tasks_and_should_log[task_name],
)
except Exception as e:
sentry_sdk.capture_exception(e)
logging.exception(e)
def save_progress(self, task_name: str):
self.postgres.startup.upload_startup(
Startup(service_name=task_name, time_log=datetime.datetime.now())
)
def _run_task(self, task_name: str, task_func: Callable, log: bool):
try:
task_func()
self.logging.info(f"задача {task_name} завершена")
except Exception as e:
sentry_sdk.capture_exception(e)
self.logging.exception(e)
self.logging.warn(f"задача {task_name} завершена с ошибками")
if log:
self.save_progress(task_name)
self.logging.info("прогресс записан в бд")