Skip to content

Commit

Permalink
Merge pull request ckan#5081 from DataShades/cli-jobs
Browse files Browse the repository at this point in the history
CLI. jobs
  • Loading branch information
amercader authored Nov 19, 2019
2 parents 0378262 + ab7636b commit 68aa531
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ckan/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click
from ckan.cli import config_tool
from ckan.cli import (
jobs,
datapusher,
front_end_build,
click_config_option, db, load_config, search_index, server,
Expand Down Expand Up @@ -42,6 +43,7 @@ def ckan(ctx, config, *args, **kwargs):
ctx.obj = CkanCommand(config)


ckan.add_command(jobs.jobs)
ckan.add_command(config_tool.config_tool)
ckan.add_command(front_end_build.front_end_build)
ckan.add_command(server.run)
Expand Down
131 changes: 131 additions & 0 deletions ckan/cli/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# encoding: utf-8

import click

import ckan.lib.jobs as bg_jobs
import ckan.logic as logic
import ckan.plugins as p
from ckan.cli import error_shout


@click.group(name=u"jobs", short_help=u"Manage background jobs.")
def jobs():
pass


@jobs.command(short_help=u"Start a worker.",)
@click.option(u"--burst", is_flag=True, help=u"Start worker in burst mode.")
@click.argument(u"queues", nargs=-1)
def worker(burst, queues):
"""Start a worker that fetches jobs from queues and executes them. If
no queue names are given then the worker listens to the default
queue, this is equivalent to
paster jobs worker default
If queue names are given then the worker listens to those queues
and only those:
paster jobs worker my-custom-queue
Hence, if you want the worker to listen to the default queue and
some others then you must list the default queue explicitly:
paster jobs worker default my-custom-queue
If the `--burst` option is given then the worker will exit as soon
as all its queues are empty.
"""
bg_jobs.Worker(queues).work(burst=burst)


@jobs.command(name=u"list", short_help=u"List jobs.")
@click.argument(u"queues", nargs=-1)
def list_jobs(queues):
"""List currently enqueued jobs from the given queues. If no queue
names are given then the jobs from all queues are listed.
"""
data_dict = {
u"queues": list(queues),
}
jobs = p.toolkit.get_action(u"job_list")({u"ignore_auth": True}, data_dict)
if not jobs:
return click.secho(u"There are no pending jobs.", fg=u"green")
for job in jobs:
if job[u"title"] is None:
job[u"title"] = u""
else:
job[u"title"] = u'"{}"'.format(job[u"title"])
click.secho(u"{created} {id} {queue} {title}".format(**job))


@jobs.command(short_help=u"Show details about a specific job.")
@click.argument(u"id")
def show(id):
try:
job = p.toolkit.get_action(u"job_show")(
{u"ignore_auth": True}, {u"id": id}
)
except logic.NotFound:
return error_shout(u'There is no job with ID "{}"'.format(id))
click.secho(u"ID: {}".format(job[u"id"]))
if job[u"title"] is None:
title = u"None"
else:
title = u'"{}"'.format(job[u"title"])
click.secho(u"Title: {}".format(title))
click.secho(u"Created: {}".format(job[u"created"]))
click.secho(u"Queue: {}".format(job[u"queue"]))


@jobs.command(short_help=u"Cancel a specific job.")
@click.argument(u"id")
def cancel(id):
"""Cancel a specific job. Jobs can only be canceled while they are
enqueued. Once a worker has started executing a job it cannot be
aborted anymore.
"""
try:
p.toolkit.get_action(u"job_cancel")(
{u"ignore_auth": True}, {u"id": id}
)
except logic.NotFound:
return error_shout(u'There is no job with ID "{}"'.format(id))

click.secho(u"Cancelled job {}".format(id), fg=u"green")


@jobs.command(short_help=u"Cancel all jobs.")
@click.argument(u"queues", nargs=-1)
def clear(queues):
"""Cancel all jobs on the given queues. If no queue names are given
then ALL queues are cleared.
"""
data_dict = {
u"queues": list(queues),
}
queues = p.toolkit.get_action(u"job_clear")(
{u"ignore_auth": True}, data_dict
)
queues = (u'"{}"'.format(q) for q in queues)
click.secho(u"Cleared queue(s) {}".format(u", ".join(queues)), fg=u"green")


@jobs.command(short_help=u"Enqueue a test job.")
@click.argument(u"queues", nargs=-1)
def test(queues):
"""Enqueue a test job. If no queue names are given then the job is
added to the default queue. If queue names are given then a
separate test job is added to each of the queues.
"""
for queue in queues or [bg_jobs.DEFAULT_QUEUE_NAME]:
job = bg_jobs.enqueue(
bg_jobs.test_job, [u"A test job"], title=u"A test job", queue=queue
)
click.secho(
u'Added test job {} to queue "{}"'.format(job.id, queue),
fg=u"green",
)

0 comments on commit 68aa531

Please sign in to comment.