-
-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathtest_label_based.py
59 lines (49 loc) · 1.55 KB
/
test_label_based.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from datetime import datetime
from typing import Any, Dict, List
import pytest
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.schedule_sources.label_based import LabelScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
@pytest.mark.anyio
@pytest.mark.parametrize(
"schedule_label",
[
pytest.param([{"cron": "* * * * *"}], id="cron"),
pytest.param([{"time": datetime.utcnow()}], id="time"),
pytest.param([{"period": 1.0}], id="period"),
],
)
async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None:
broker = InMemoryBroker()
@broker.task(
task_name="test_task",
schedule=schedule_label,
)
def task() -> None:
pass
source = LabelScheduleSource(broker)
schedules = await source.get_schedules()
assert schedules == [
ScheduledTask(
schedule_id=schedules[0].schedule_id,
cron=schedule_label[0].get("cron"),
time=schedule_label[0].get("time"),
period=schedule_label[0].get("period"),
task_name="test_task",
labels={"schedule": schedule_label},
args=[],
kwargs={},
),
]
@pytest.mark.anyio
async def test_label_discovery_no_cron() -> None:
broker = InMemoryBroker()
@broker.task(
task_name="test_task",
schedule=[{"args": ["* * * * *"]}],
)
def task() -> None:
pass
source = LabelScheduleSource(broker)
schedules = await source.get_schedules()
assert schedules == []