forked from celery/django-celery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathviews.py
120 lines (87 loc) · 3.68 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from django.http import HttpResponse, Http404
from anyjson import serialize as JSON_dump
from celery.app import default_app
from celery.utils import get_full_cls_name
from celery.result import AsyncResult
from celery.registry import tasks
from celery.utils.functional import wraps
# Ensure built-in tasks are loaded for task_list view
import celery.task
def task_view(task):
"""Decorator turning any task into a view that applies the task
asynchronously. Keyword arguments (via URLconf, etc.) will
supercede GET or POST parameters when there are conflicts.
Returns a JSON dictionary containing the keys ``ok``, and
``task_id``.
"""
def _applier(request, **options):
kwargs = request.method == "POST" and \
request.POST.copy() or request.GET.copy()
kwargs = dict((key.encode("utf-8"), value)
for key, value in kwargs.items())
kwargs.update(options)
result = task.apply_async(kwargs=kwargs)
response_data = {"ok": "true", "task_id": result.task_id}
return HttpResponse(JSON_dump(response_data),
mimetype="application/json")
return _applier
def apply(request, task_name):
"""View applying a task.
**Note:** Please use this with caution. Preferably you shouldn't make this
publicly accessible without ensuring your code is safe!
"""
try:
task = tasks[task_name]
except KeyError:
raise Http404("apply: no such task")
return task_view(task)(request)
def is_task_successful(request, task_id):
"""Returns task execute status in JSON format."""
response_data = {"task": {"id": task_id,
"executed": AsyncResult(task_id).successful()}}
return HttpResponse(JSON_dump(response_data), mimetype="application/json")
def task_status(request, task_id):
"""Returns task status and result in JSON format."""
status = default_app.backend.get_status(task_id)
res = default_app.backend.get_result(task_id)
response_data = dict(id=task_id, status=status, result=res)
if status in default_app.backend.EXCEPTION_STATES:
traceback = default_app.backend.get_traceback(task_id)
response_data.update({"result": repr(res),
"exc": get_full_cls_name(res.__class__),
"traceback": traceback})
return HttpResponse(JSON_dump({"task": response_data}),
mimetype="application/json")
def registered_tasks(request):
"""
A view returning all defined tasks as a JSON object.
"""
response_data = {"regular": tasks.regular().keys(),
"periodic": tasks.periodic().keys()}
return HttpResponse(JSON_dump(response_data), mimetype="application/json")
def task_webhook(fun):
"""Decorator turning a function into a task webhook.
If an exception is raised within the function, the decorated
function catches this and returns an error JSON response, otherwise
it returns the result as a JSON response.
Example:
.. code-block:: python
@task_webhook
def add(request):
x = int(request.GET["x"])
y = int(request.GET["y"])
return x + y
>>> response = add(request)
>>> response.content
'{"status": "success", "retval": 100}'
"""
@wraps(fun)
def _inner(*args, **kwargs):
try:
retval = fun(*args, **kwargs)
except Exception, exc:
response = {"status": "failure", "reason": str(exc)}
else:
response = {"status": "success", "retval": retval}
return HttpResponse(JSON_dump(response), mimetype="application/json")
return _inner