Skip to content

Commit

Permalink
Merge branch '3.0'
Browse files Browse the repository at this point in the history
Conflicts:
	Changelog
	README.rst
	celery/__init__.py
	docs/includes/introduction.txt
  • Loading branch information
ask committed Jul 20, 2012
2 parents 8af99e4 + 5a1c49b commit 2718b05
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 16 deletions.
68 changes: 66 additions & 2 deletions Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,77 @@
.. contents::
:local:

3.1.0
=====
:state: DEVEL
:branch: master

- `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
- `App.control.Inspect.conf` can be used for inspecting worker configuration

.. _version-3.0.2:

3.0.2
=====
:release-date: 2012-07-20 04:00 P.M BST
:by: Ask Solem

- `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
- `App.control.Inspect.conf` can be used for inspecting worker configuration
- A bug caused the following task options to not take defaults from the
configuration (Issue #867 + Issue #858)

The following settings were affected:

- :setting:`CELERY_IGNORE_RESULT`
- :setting:`CELERYD_SEND_TASK_ERROR_EMAILS`
- :setting:`CELERY_TRACK_STARTED`
- :setting:`CElERY_STORE_ERRORS_EVEN_IF_IGNORED`

Fix contributed by John Watson.

- Task Request: ``delivery_info`` is now passed through as-is (Issue #807).

- The eta argument now supports datetime's with a timezone set (Issue #855).

- The worker's banner displayed the autoscale settings in the wrong order
(Issue #859).

- Extension commands are now loaded after concurrency is set up
so that they don't interfere with e.g. eventlet patching.

- Fixed bug in the threaded pool (Issue #863)

- The task failure handler mixed up the fields in :func:`sys.exc_info`.

Fix contributed by Rinat Shigapov.

- Fixed typos and wording in the docs.

Fix contributed by Paul McMillan

- New setting: :setting:`CELERY_WORKER_DIRECT`

If enabled each worker will consume from their own dedicated queue
which can be used to route tasks to specific workers.

- Fixed several edge case bugs in the add consumer remote control command.

- :mod:`~celery.contrib.migrate`: Can now filter and move tasks to specific
workers if :setting:`CELERY_WORKER_DIRECT` is enabled.

Among other improvements, the following functions have been added:

* ``move_direct(filterfun, **opts)``
* ``move_direct_by_id(task_id, worker_hostname, **opts)``
* ``move_direct_by_idmap({task_id: worker_hostname, ...}, **opts)``
* ``move_direct_by_taskmap({task_name: worker_hostname, ...}, **opts)``

- :meth:`~celery.Celery.default_connection` now accepts a pool argument that
if set to false causes a new connection to be created instead of acquiring
one from the pool.

- New signal: :signal:`celeryd_after_setup`.

- Default loader now keeps lowercase attributes from the configuration module.

.. _version-3.0.1:

Expand Down
1 change: 1 addition & 0 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
. %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
"""


class Queues(dict):
"""Queue name⇒ declaration mapping.
Expand Down
3 changes: 2 additions & 1 deletion celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def startup_info(self):
loader = loader[14:]
appr += ' (%s)' % loader
if self.autoscale:
concurrency = '{min=%s, max=%s}' % tuple(self.autoscale)
max, min = self.autoscale
concurrency = '{min=%s, max=%s}' % (min, max)
pool = self.pool_cls
if not isinstance(pool, basestring):
pool = pool.__module__
Expand Down
1 change: 1 addition & 0 deletions celery/bin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def load_extension_commands(namespace='celery.commands'):
else:
command(cls, name=ep.name)


class Command(BaseCommand):
help = ''
args = ''
Expand Down
2 changes: 1 addition & 1 deletion celery/contrib/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import socket

from functools import partial, wraps
from functools import partial
from itertools import cycle, islice

from kombu import eventloop, Queue
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/worker/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def test_default_kwargs(self):
'task_id': tw.id,
'task_retries': 0,
'task_is_eager': False,
'delivery_info': {'exchange': None, 'routing_key': None},
'delivery_info': {},
'task_name': tw.name})

@patch('celery.worker.job.logger')
Expand Down
1 change: 1 addition & 0 deletions celery/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def test_receieve_message_eta_isoformat(self):
l.update_strategies()
l.receive_message(m.decode(), m)
l.timer.stop()
l.timer.join(1)

items = [entry[2] for entry in self.timer.queue]
found = 0
Expand Down
14 changes: 9 additions & 5 deletions celery/utils/timeutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def tz_or_local(self, tzinfo=None):
return self.get_timezone(tzinfo)

def to_local(self, dt, local=None, orig=None):
return set_tz(dt, orig or self.utc).astimezone(
self.tz_or_local(local))
if is_naive(dt):
dt = set_tz(dt, orig or self.utc)
return dt.astimezone(self.tz_or_local(local))

def get_timezone(self, zone):
if isinstance(zone, basestring):
Expand Down Expand Up @@ -202,10 +203,13 @@ def is_naive(dt):

def set_tz(dt, tz):
"""Sets the timezone for a datetime object."""
if hasattr(tz, 'localize'):
try:
localize = tz.localize
except AttributeError:
return dt.replace(tzinfo=tz)
else:
# works on pytz timezones
return tz.localize(dt, is_dst=None)
return dt.replace(tzinfo=tz)
return localize(dt, is_dst=None)


def to_utc(dt):
Expand Down
7 changes: 1 addition & 6 deletions celery/worker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ def __init__(self, body, on_ack=noop,
else:
self.expires = None

delivery_info = {} if delivery_info is None else delivery_info
self.delivery_info = {
'exchange': delivery_info.get('exchange'),
'routing_key': delivery_info.get('routing_key'),
}

self.delivery_info = delivery_info or {}
self.request_dict = body

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions docs/userguide/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ The request defines the following attributes:
containing the exchange and routing key used to deliver this
task. Used by e.g. :meth:`[email protected]`
to resend the task to the same destination queue.
Availability of keys in this dict depends on the
message broker used.


An example task accessing information in the context is:
Expand Down

0 comments on commit 2718b05

Please sign in to comment.