forked from Flagsmith/flagsmith
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.py
100 lines (72 loc) · 2.67 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
import traceback
import typing
from django.utils import timezone
from task_processor.models import (
RecurringTask,
RecurringTaskRun,
Task,
TaskResult,
TaskRun,
)
logger = logging.getLogger(__name__)
def run_tasks(num_tasks: int = 1) -> typing.List[TaskRun]:
if num_tasks < 1:
raise ValueError("Number of tasks to process must be at least one")
tasks = Task.objects.get_tasks_to_process(num_tasks)
if tasks:
executed_tasks = []
task_runs = []
for task in tasks:
task, task_run = _run_task(task)
executed_tasks.append(task)
task_runs.append(task_run)
if executed_tasks:
Task.objects.bulk_update(
executed_tasks, fields=["completed", "num_failures", "is_locked"]
)
if task_runs:
TaskRun.objects.bulk_create(task_runs)
return task_runs
logger.debug("No tasks to process.")
return []
def run_recurring_tasks(num_tasks: int = 1) -> typing.List[RecurringTaskRun]:
if num_tasks < 1:
raise ValueError("Number of tasks to process must be at least one")
# NOTE: We will probably see a lot of delay in the execution of recurring tasks
# if the tasks take longer then `run_every` to execute. This is not
# a problem for now, but we should be mindful of this limitation
tasks = RecurringTask.objects.get_tasks_to_process(num_tasks)
if tasks:
task_runs = []
for task in tasks:
# Remove the task if it's not registered anymore
if not task.is_task_registered:
task.delete()
continue
if task.should_execute:
task, task_run = _run_task(task)
task_runs.append(task_run)
else:
task.unlock()
# update all tasks that were not deleted
to_update = [task for task in tasks if task.id]
RecurringTask.objects.bulk_update(to_update, fields=["is_locked"])
if task_runs:
RecurringTaskRun.objects.bulk_create(task_runs)
return task_runs
logger.debug("No tasks to process.")
return []
def _run_task(task: typing.Union[Task, RecurringTask]) -> typing.Tuple[Task, TaskRun]:
task_run = task.task_runs.model(started_at=timezone.now(), task=task)
try:
task.run()
task_run.result = TaskResult.SUCCESS
task_run.finished_at = timezone.now()
task.mark_success()
except Exception as e:
logger.warning(e)
task.mark_failure()
task_run.result = TaskResult.FAILURE
task_run.error_details = str(traceback.format_exc())
return task, task_run