Skip to content

Commit

Permalink
[rpc backend] now works with chains
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jun 13, 2013
1 parent cbd8f56 commit 27957b5
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 27 deletions.
7 changes: 4 additions & 3 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def delay(self, *args, **kwargs):
def apply_async(self, args=None, kwargs=None,
task_id=None, producer=None, connection=None, router=None,
link=None, link_error=None, publisher=None,
add_to_parent=True, **options):
add_to_parent=True, reply_to=None, **options):
"""Apply tasks asynchronously by sending a message.
:keyword args: The positional arguments to pass on to the
Expand Down Expand Up @@ -490,12 +490,13 @@ def apply_async(self, args=None, kwargs=None,
if connection:
producer = app.amqp.TaskProducer(connection)
with app.producer_or_acquire(producer) as P:
extra_properties = self.backend.on_task_call(P, task_id)
self.backend.on_task_call(P, task_id)
task_id = P.publish_task(self.name, args, kwargs,
task_id=task_id,
callbacks=maybe_list(link),
errbacks=maybe_list(link_error),
**dict(options, **extra_properties))
reply_to=reply_to or self.app.oid,
**options)
result = self.AsyncResult(task_id)
if add_to_parent:
parent = get_current_worker_task()
Expand Down
6 changes: 0 additions & 6 deletions celery/backends/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def _create_exchange(self, name, type='direct', persistent=False):

def on_task_call(self, producer, task_id):
maybe_declare(self.binding(producer.channel), retry=True)
return self.extra_properties

def _create_binding(self, task_id):
return self.binding
Expand All @@ -50,8 +49,3 @@ def binding(self):
@cached_property
def oid(self):
return self.app.oid

@cached_property
def extra_properties(self):
return {'reply_to': self.oid}

8 changes: 2 additions & 6 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid

from celery._state import current_app, get_current_worker_task
from celery._state import current_app
from celery.result import AsyncResult, GroupResult
from celery.utils.functional import (
maybe_list, is_list, regen,
Expand Down Expand Up @@ -170,11 +170,7 @@ def freeze(self, _id=None):
except KeyError:
tid = opts['task_id'] = _id or uuid()
if 'reply_to' not in opts:
curtask = get_current_worker_task()
if curtask:
opts['repy_to'] = curtask.request.reply_to
else:
opts['reply_to'] = self.type.app.oid
opts['reply_to'] = self.type.app.oid
return self.AsyncResult(tid)
_freeze = freeze

Expand Down
2 changes: 0 additions & 2 deletions celery/task/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ def trace_task(uuid, args, kwargs, request=None):
else:
# callback tasks must be applied before the result is
# stored, so that result.children is populated.
print('CALLBACK OPTS: %r' % ([callback.options for
callback in task_request.callbacks or []], ))
[subtask(callback).apply_async((retval, ))
for callback in task_request.callbacks or []]
if publish_result:
Expand Down
12 changes: 2 additions & 10 deletions celery/tests/backends/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_oid(self):
oid = self.b.oid
oid2 = self.b.oid
self.assertEqual(oid, oid2)
self.assertEqual(oid, self.app.oid)

def test_interface(self):
self.b.on_reply_declare('task_id')
Expand Down Expand Up @@ -47,19 +48,10 @@ def test_many_bindings(self):
def test_create_binding(self):
self.assertEqual(self.b._create_binding('id'), self.b.binding)

def test_extra_properties(self):
self.assertDictEqual(
self.b.extra_properties,
{'reply_to': self.b.oid},
)

def test_on_task_call(self):
with patch('celery.backends.rpc.maybe_declare') as md:
with self.app.amqp.producer_pool.acquire() as prod:
self.assertEqual(
self.b.on_task_call(prod, 'task_id'),
self.b.extra_properties,
)
self.b.on_task_call(prod, 'task_id'),
md.assert_called_with(
self.b.binding(prod.channel),
retry=True,
Expand Down

0 comments on commit 27957b5

Please sign in to comment.