Skip to content

Commit

Permalink
Reorganize command-line arguments to match class arguments.
Browse files Browse the repository at this point in the history
--send-events -> --task-events
--schedule -> --schedule-filename
--maxtasksperchid -> --max-tasks-per-child

--maxmemperchild -> --max-memory-per-child (new in 4.0)

Beat(scheduler_cls=) -> Beat(scheduler=)

Worker(send_events=) -> Worker(task_events=)
Worker(task_time_limit=) -> Worker(time_limit=)
Worker(task_soft_time_limit=) -> Worker(task_soft_time_limit=)
Worker(state_db=) -> Worker(statedb=)

Beat(working_directory=) -> Beat(workdir=)
  • Loading branch information
ask committed Aug 4, 2016
1 parent 3a2ea72 commit bbe3b1d
Show file tree
Hide file tree
Showing 19 changed files with 96 additions and 92 deletions.
7 changes: 5 additions & 2 deletions celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ class Beat(object):
def __init__(self, max_interval=None, app=None,
socket_timeout=30, pidfile=None, no_color=None,
loglevel='WARN', logfile=None, schedule=None,
scheduler_cls=None, redirect_stdouts=None,
scheduler=None,
scheduler_cls=None, # XXX use scheduler
redirect_stdouts=None,
redirect_stdouts_level=None, **kwargs):
self.app = app = app or self.app
either = self.app.either
self.loglevel = loglevel
self.logfile = logfile
self.schedule = either('beat_schedule_filename', schedule)
self.scheduler_cls = either('beat_scheduler', scheduler_cls)
self.scheduler_cls = either(
'beat_scheduler', scheduler, scheduler_cls)
self.redirect_stdouts = either(
'worker_redirect_stdouts', redirect_stdouts)
self.redirect_stdouts_level = either(
Expand Down
5 changes: 3 additions & 2 deletions celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def safe_say(msg):
.> transport: {conninfo}
.> results: {results}
.> concurrency: {concurrency}
.> task events: {events}
[queues]
{queues}
Expand Down Expand Up @@ -194,8 +195,8 @@ def startup_info(self, artlines=True):
pool = pool.__module__
concurrency += ' ({0})'.format(pool.split('.')[-1])
events = 'ON'
if not self.send_events:
events = 'OFF (enable -E to monitor this worker)'
if not self.task_events:
events = 'OFF (enable -E to monitor tasks in this worker)'

banner = BANNER.format(
app=appr,
Expand Down
4 changes: 2 additions & 2 deletions celery/bin/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Command(object):
Option('-b', '--broker', default=None),
Option('--loader', default=None),
Option('--config', default=None),
Option('--workdir', default=None, dest='working_directory'),
Option('--workdir', default=None),
Option('--no-color', '-C', action='store_true', default=None),
Option('--quiet', '-q', action='store_true'),
)
Expand Down Expand Up @@ -382,7 +382,7 @@ def setup_app_from_commandline(self, argv):
self.no_color = preload_options['no_color']
except KeyError:
pass
workdir = preload_options.get('working_directory')
workdir = preload_options.get('workdir')
if workdir:
os.chdir(workdir)
app = (preload_options.get('app') or
Expand Down
5 changes: 2 additions & 3 deletions celery/bin/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ class beat(Command):
supports_args = False

def run(self, detach=False, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, **kwargs):
gid=None, umask=None, workdir=None, **kwargs):
if not detach:
maybe_drop_privileges(uid=uid, gid=gid)
workdir = working_directory
kwargs.pop('app', None)
beat = partial(self.app.Beat,
logfile=logfile, pidfile=pidfile, **kwargs)
Expand All @@ -112,7 +111,7 @@ def prepare_arguments(self, parser):
parser.add_option('--detach', action='store_true')
parser.add_option('-s', '--schedule', default=c.beat_schedule_filename)
parser.add_option('--max-interval', type='float')
parser.add_option('-S', '--scheduler', dest='scheduler_cls')
parser.add_option('-S', '--scheduler')
parser.add_option('-l', '--loglevel', default='WARN')
daemon_options(parser, default_pidfile='celerybeat.pid')
parser.add_options(self.app.user_options['beat'])
Expand Down
19 changes: 8 additions & 11 deletions celery/bin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,23 +826,20 @@ class shell(Command): # pragma: no cover

option_list = Command.option_list + (
Option('--ipython', '-I',
action='store_true', dest='force_ipython',
help='force iPython.'),
action='store_true', help='force iPython.'),
Option('--bpython', '-B',
action='store_true', dest='force_bpython',
help='force bpython.'),
action='store_true', help='force bpython.'),
Option('--python', '-P',
action='store_true', dest='force_python',
help='force default Python shell.'),
action='store_true', help='force default Python shell.'),
Option('--without-tasks', '-T', action='store_true',
help="don't add tasks to locals."),
Option('--eventlet', action='store_true',
help='use eventlet.'),
Option('--gevent', action='store_true', help='use gevent.'),
)

def run(self, force_ipython=False, force_bpython=False,
force_python=False, without_tasks=False, eventlet=False,
def run(self, ipython=False, bpython=False,
python=False, without_tasks=False, eventlet=False,
gevent=False, **kwargs):
sys.path.insert(0, os.getcwd())
if eventlet:
Expand Down Expand Up @@ -872,11 +869,11 @@ def run(self, force_ipython=False, force_bpython=False,
if not task.name.startswith('celery.')
})

if force_python:
if python:
return self.invoke_fallback_shell()
elif force_bpython:
elif bpython:
return self.invoke_bpython_shell()
elif force_ipython:
elif ipython:
return self.invoke_ipython_shell()
return self.invoke_default_shell()

Expand Down
8 changes: 4 additions & 4 deletions celery/bin/celeryd_detach.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@


def detach(path, argv, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, fake=False, app=None,
gid=None, umask=None, workdir=None, fake=False, app=None,
executable=None, hostname=None):
hostname = default_nodename(hostname)
logfile = node_format(logfile, hostname)
pidfile = node_format(pidfile, hostname)
fake = 1 if C_FAKEFORK else fake
with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
with detached(logfile, pidfile, uid, gid, umask, workdir, fake,
after_forkers=False):
try:
if executable is not None:
Expand Down Expand Up @@ -160,11 +160,11 @@ def execute_from_commandline(self, argv=None):

def prepare_arguments(self, parser):
daemon_options(parser, default_pidfile='celeryd.pid')
parser.add_option('--workdir', default=None, dest='working_directory')
parser.add_option('--workdir', default=None)
parser.add_option('-n', '--hostname')
parser.add_option(
'--fake',
default=False, action='store_true', dest='fake',
default=False, action='store_true',
help="Don't fork (for debugging purposes)",
)

Expand Down
7 changes: 3 additions & 4 deletions celery/bin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class events(Command):
def run(self, dump=False, camera=None, frequency=1.0, maxrate=None,
loglevel='INFO', logfile=None, prog_name='celery events',
pidfile=None, uid=None, gid=None, umask=None,
working_directory=None, detach=False, **kwargs):
workdir=None, detach=False, **kwargs):
self.prog_name = prog_name

if dump:
Expand All @@ -114,7 +114,7 @@ def run(self, dump=False, camera=None, frequency=1.0, maxrate=None,
loglevel=loglevel, logfile=logfile,
pidfile=pidfile, uid=uid, gid=gid,
umask=umask,
working_directory=working_directory,
workdir=workdir,
detach=detach)
return self.run_evtop()

Expand All @@ -129,10 +129,9 @@ def run_evtop(self):
return evtop(app=self.app)

def run_evcam(self, camera, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None,
gid=None, umask=None, workdir=None,
detach=False, **kwargs):
from celery.events.snapshot import evcam
workdir = working_directory
self.set_process_status('cam')
kwargs['app'] = self.app
cam = partial(evcam, camera,
Expand Down
42 changes: 18 additions & 24 deletions celery/bin/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
Path to the state database. The extension '.db' may
be appended to the filename. Default: {default}
.. cmdoption:: -E, --events
.. cmdoption:: -E, --task-events
Send task-related events that can be captured by monitors like
:program:`celery events`, `celerymon`, and others.
Expand Down Expand Up @@ -106,12 +106,12 @@
Enables a soft time limit (in seconds int/float) for tasks.
.. cmdoption:: --maxtasksperchild
.. cmdoption:: --max-tasks-per-child
Maximum number of tasks a pool worker can execute before it's
terminated and replaced by a new worker.
.. cmdoption:: --maxmemperchild
.. cmdoption:: --max-memory-per-child
Maximum amount of resident memory, in KiB, that may be consumed by a
child process before it will be replaced by a new one. If a single
Expand Down Expand Up @@ -216,7 +216,7 @@ def maybe_detach(self, argv, dopts=['-D', '--detach']):
raise SystemExit(0)

def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, state_db=None,
loglevel=None, logfile=None, pidfile=None, statedb=None,
**kwargs):
maybe_drop_privileges(uid=uid, gid=gid)
# Pools like eventlet/gevent needs to patch libs as early
Expand All @@ -239,7 +239,7 @@ def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=self.node_format(pidfile, hostname),
state_db=self.node_format(state_db, hostname), **kwargs
statedb=self.node_format(statedb, hostname), **kwargs
)
worker.start()
return worker.exitcode
Expand All @@ -257,14 +257,13 @@ def prepare_arguments(self, parser):
wopts.add_option('-D', '--detach', action='store_true')
wopts.add_option(
'-S', '--statedb',
default=conf.worker_state_db, dest='state_db',
default=conf.worker_state_db)
)
wopts.add_option('-l', '--loglevel', default='WARN')
wopts.add_option('-O', dest='optimization')
wopts.add_option(
'--prefetch-multiplier',
dest='prefetch_multiplier', type='int',
default=conf.worker_prefetch_multiplier,
type='int', default=conf.worker_prefetch_multiplier,
)
parser.add_option_group(wopts)

Expand All @@ -275,32 +274,27 @@ def prepare_arguments(self, parser):
)
topts.add_option(
'-P', '--pool',
default=conf.worker_pool, dest='pool_cls',
default=conf.worker_pool,
)
topts.add_option(
'-E', '--events',
default=conf.worker_send_task_events,
action='store_true', dest='send_events',
'-E', '--task-events', '--events',
action='store_true', default=conf.worker_send_task_events,
)
topts.add_option(
'--time-limit',
type='float', dest='task_time_limit',
default=conf.task_time_limit,
type='float', default=conf.task_time_limit,
)
topts.add_option(
'--soft-time-limit',
dest='task_soft_time_limit', type='float',
default=conf.task_soft_time_limit,
type='float', default=conf.task_soft_time_limit,
)
topts.add_option(
'--maxtasksperchild',
dest='max_tasks_per_child', type='int',
default=conf.worker_max_tasks_per_child,
'--max-tasks-per-child', '--maxtasksperchild',
type='int', default=conf.worker_max_tasks_per_child,
)
topts.add_option(
'--maxmemperchild',
dest='max_memory_per_child', type='int',
default=conf.worker_max_memory_per_child,
'--max-memory-per-child', '--maxmemperchild',
type='int', default=conf.worker_max_memory_per_child,
)
parser.add_option_group(topts)

Expand Down Expand Up @@ -332,10 +326,10 @@ def prepare_arguments(self, parser):
bopts = OptionGroup(parser, 'Embedded Beat Options')
bopts.add_option('-B', '--beat', action='store_true')
bopts.add_option(
'-s', '--schedule', dest='schedule_filename',
'-s', '--schedule-filename', '--schedule',
default=conf.beat_schedule_filename,
)
bopts.add_option('--scheduler', dest='scheduler_cls')
bopts.add_option('--scheduler')
parser.add_option_group(bopts)

user_options = self.app.user_options['worker']
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/bin/test_celeryd_detach.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_execute_from_commandline(self, detach, exit):
detach.assert_called_with(
path=x.execv_path, uid=None, gid=None,
umask=None, fake=False, logfile='/var/log', pidfile='celeryd.pid',
working_directory=None, executable=None, hostname=None,
workdir=None, executable=None, hostname=None,
argv=x.execv_argv + [
'-c', '1', '-lDEBUG',
'--logfile=/var/log', '--pidfile=celeryd.pid',
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/bin/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_startup_info(self, stdout, stderr):
self.assertTrue(worker.startup_info())

self.app.loader = prev_loader
worker.send_events = True
worker.task_events = True
self.assertTrue(worker.startup_info())

# test when there are too few output lines
Expand Down
10 changes: 5 additions & 5 deletions celery/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def test_receieve_message(self):
self.assertTrue(self.timer.empty())

def test_start_channel_error(self):
c = self.NoopConsumer(send_events=False, pool=BasePool())
c = self.NoopConsumer(task_events=False, pool=BasePool())
c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
c.channel_errors = (KeyError,)
try:
Expand All @@ -284,7 +284,7 @@ def test_start_channel_error(self):
c.timer and c.timer.stop()

def test_start_connection_error(self):
c = self.NoopConsumer(send_events=False, pool=BasePool())
c = self.NoopConsumer(task_events=False, pool=BasePool())
c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
c.connection_errors = (KeyError,)
try:
Expand Down Expand Up @@ -661,7 +661,7 @@ def raises_KeyError(*args, **kwargs):
self.assertEqual(c.qos.prev, c.qos.value)

init_callback.reset_mock()
c = self.NoopConsumer(send_events=False, init_callback=init_callback)
c = self.NoopConsumer(task_events=False, init_callback=init_callback)
c.qos = _QoS()
c.connection = Connection()
c.loop = Mock(side_effect=socket.error('foo'))
Expand Down Expand Up @@ -886,13 +886,13 @@ def test_start_catches_base_exceptions(self):
worker2.start()
self.assertTrue(sec.stop.call_count)

def test_state_db(self):
def test_statedb(self):
from celery.worker import state
Persistent = state.Persistent

state.Persistent = Mock()
try:
worker = self.create_worker(state_db='statefilename')
worker = self.create_worker(statedb='statefilename')
self.assertTrue(worker._persistence)
finally:
state.Persistent = Persistent
Expand Down
Loading

0 comments on commit bbe3b1d

Please sign in to comment.