Skip to content

Commit

Permalink
Bug 1252948: support for periodic taskgraphs; r=Callek,jonasfj,kmoir
Browse files Browse the repository at this point in the history
This adds `.cron.yml` and a new mach command to interpret it.  While
functionality is limited to nightlies right now, there is room to expand to
more diverse periodic tasks.  Let your imagination run wild!

MozReview-Commit-ID: KxQkaUbsjQs

--HG--
extra : rebase_source : ddf0a1eadae5a1169c0ead7bcb7b9ce61b255fbf
  • Loading branch information
djmitche committed Jan 18, 2017
1 parent 17794b1 commit b8d8f8a
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 8 deletions.
29 changes: 29 additions & 0 deletions .cron.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Definitions for jobs that run periodically. For details on the format, see
# `taskcluster/taskgraph/cron/schema.py`. For documentation, see
# `taskcluster/docs/cron.rst`.

jobs:
- name: nightly-desktop
job:
type: decision-task
treeherder-symbol: Nd
triggered-by: nightly
target-tasks-method: nightly_linux
projects:
- mozilla-central
- date
when:
- {hour: 16, minute: 0}

- name: nightly-android
job:
type: decision-task
treeherder-symbol: Na
triggered-by: nightly
target-tasks-method: nightly_fennec
projects:
- mozilla-central
- date
when:
- {hour: 16, minute: 0}

11 changes: 9 additions & 2 deletions .taskcluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ scopes:
- docker-worker:*
- scheduler:*

# Available mustache parameters (see the mozilla-taskcluster source):
# This file undergoes substitution to create tasks. For on-push tasks, that
# substitution is done by mozilla-taskcluster. For cron tasks, that substitution
# is done by `taskcluster/taskgraph/cron/decision.py`. If you change any of the
# template parameters, please do so in all three places!
#
# Available template parameters:
#
# - now: current time
# - owner: push user (email address)
# - source: URL of this YAML file
# - url: repository URL
Expand All @@ -29,6 +35,7 @@ scopes:
# and functions:
# - as_slugid: convert a label into a slugId
# - from_now: generate a timestamp at a fixed offset from now
# - shellquote: quote the contents for injection into shell

# The resulting tasks' taskGroupId will be equal to the taskId of the first
# task listed here, which should be the decision task. This gives other tools
Expand All @@ -42,7 +49,7 @@ tasks:
deadline: '{{#from_now}}1 day{{/from_now}}'
expires: '{{#from_now}}365 day{{/from_now}}'
metadata:
owner: [email protected]
owner: {{owner}}
source: {{{source}}}
name: "Gecko Decision Task"
description: |
Expand Down
48 changes: 48 additions & 0 deletions taskcluster/docs/cron.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Periodic Taskgraphs
===================

The cron functionality allows in-tree scheduling of task graphs that run
periodically, instead of on a push.

How It Works
------------

The `TaskCluster Hooks Service <https://tools.taskcluster.net/hooks>`_ has a
hook configured for each repository supporting periodic task graphs. The hook
runs every 15 minutes, and the resulting task is referred to as a "cron task".
That cron task runs `./mach taskgraph cron` in a checkout of the Gecko source
tree.

The mach subcommand reads ``.cron.yml``, then consults the current time
(actually the time the cron task was created, rounded down to the nearest 15
minutes) and creates tasks for any cron jobs scheduled at that time.

Each cron job in ``.cron.yml`` specifies a ``job.using``, corresponding to a
function responsible for creating TaskCluster tasks when the job runs.

Decision Tasks
..............

For ``job.using`` "decision-task", tasks are created based on
``.taskcluster.yml`` just like the decision tasks that result from a push to a
repository. They run with a distinct ``taskGroupId``, and are free to create
additional tasks comprising a task graph.

Scopes
------

The cron task runs with the sum of all cron job scopes for the given repo. For
example, for the "sequoia" project, the scope would be
``assume:repo:hg.mozilla.org/projects/sequoia:cron:*``. Each cron job creates
tasks with scopes for that particular job, by name. For example, the
``check-frob`` cron job on that repo would run with
``assume:repo:hg.mozilla.org/projects/sequoia:cron:check-frob``.

.. important::

The individual cron scopes are a useful check to ensure that a job is not
accidentally doing something it should not, but cannot actually *prevent* a
job from using any of the scopes afforded to the cron task itself (the
``..cron:*`` scope). This is simply because the cron task runs arbitrary
code from the repo, and that code can be easily modified to create tasks
with any scopes that it posesses.
1 change: 1 addition & 0 deletions taskcluster/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ check out the :doc:`how-to section <how-tos>`.
transforms
yaml-templates
docker-images
cron
how-tos
reference
36 changes: 36 additions & 0 deletions taskcluster/mach_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,42 @@ def taskgraph_backfill(self, **options):
traceback.print_exc()
sys.exit(1)

@SubCommand('taskgraph', 'cron',
description="Run the cron task")
@CommandArgument('--base-repository',
required=True,
help='URL for "base" repository to clone')
@CommandArgument('--head-repository',
required=True,
help='URL for "head" repository to fetch')
@CommandArgument('--head-ref',
required=True,
help='Reference to fetch in head-repository (usually "default")')
@CommandArgument('--project',
required=True,
help='Project to use for creating tasks. Example: --project=mozilla-central')
@CommandArgument('--level',
required=True,
help='SCM level of this repository')
@CommandArgument('--force-run',
required=False,
help='If given, force this cronjob to run regardless of time, '
'and run no others')
@CommandArgument('--no-create',
required=False,
action='store_true',
help='Do not actually create tasks')
def taskgraph_cron(self, **options):
"""Run the cron task; this task creates zero or more decision tasks. It is run
from the hooks service on a regular basis."""
import taskgraph.cron
try:
self.setup_logging()
return taskgraph.cron.taskgraph_cron(options)
except Exception:
traceback.print_exc()
sys.exit(1)

def setup_logging(self, quiet=False, verbose=True):
"""
Set up Python logging for all loggers, sending results to stderr (so
Expand Down
6 changes: 3 additions & 3 deletions taskcluster/taskgraph/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ def create_tasks(taskgraph, label_to_taskid, params):
for f in futures.as_completed(deps_fs):
f.result()

fs[task_id] = e.submit(_create_task, session, task_id,
fs[task_id] = e.submit(create_task, session, task_id,
taskid_to_label[task_id], task_def)

# Schedule tasks as many times as task_duplicates indicates
for i in range(1, attributes.get('task_duplicates', 1)):
# We use slugid() since we want a distinct task id
fs[task_id] = e.submit(_create_task, session, slugid(),
fs[task_id] = e.submit(create_task, session, slugid(),
taskid_to_label[task_id], task_def)

# Wait for all futures to complete.
for f in futures.as_completed(fs.values()):
f.result()


def _create_task(session, task_id, label, task_def):
def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this job.

Expand Down
160 changes: 160 additions & 0 deletions taskcluster/taskgraph/cron/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


from __future__ import absolute_import, print_function, unicode_literals

import datetime
import json
import logging
import os
import traceback
import requests
import yaml

from . import decision, schema
from .util import (
match_utc,
calculate_head_rev
)
from ..create import create_task

# Functions to handle each `job.type` in `.cron.yml`. These are called with
# the contents of the `job` property from `.cron.yml` and should return a
# sequence of (taskId, task) tuples which will subsequently be fed to
# createTask.
JOB_TYPES = {
'decision-task': decision.run_decision_task,
}

GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..', '..'))
logger = logging.getLogger(__name__)
_session = None


def get_session():
global _session
if not _session:
_session = requests.Session()
return _session


def load_jobs():
with open(os.path.join(GECKO, '.cron.yml'), 'rb') as f:
cron_yml = yaml.load(f)
schema.validate(cron_yml)
return {j['name']: j for j in cron_yml['jobs']}


def should_run(job, params):
if 'projects' in job:
if not any(p == params['project'] for p in job['projects']):
return False
if not any(match_utc(params, hour=sched.get('hour'), minute=sched.get('minute'))
for sched in job.get('when', [])):
return False
return True


def run_job(job_name, job, params):
params['job_name'] = job_name

try:
job_type = job['job']['type']
if job_type in JOB_TYPES:
tasks = JOB_TYPES[job_type](job['job'], params)
else:
raise Exception("job type {} not recognized".format(job_type))
if params['no_create']:
for task_id, task in tasks:
logger.info("Not creating task {} (--no-create):\n".format(task_id) +
json.dumps(task, sort_keys=True, indent=4, separators=(',', ': ')))
else:
for task_id, task in tasks:
create_task(get_session(), task_id, params['job_name'], task)

except Exception:
# report the exception, but don't fail the whole cron task, as that
# would leave other jobs un-run. NOTE: we could report job failure to
# a responsible person here via tc-notify
traceback.print_exc()
logger.error("cron job {} run failed; continuing to next job".format(params['job_name']))


def calculate_time(options):
if 'TASK_ID' not in os.environ:
# running in a development environment, so look for CRON_TIME or use
# the current time
if 'CRON_TIME' in os.environ:
logger.warning("setting params['time'] based on $CRON_TIME")
time = datetime.datetime.utcfromtimestamp(int(os.environ['CRON_TIME']))
else:
logger.warning("using current time for params['time']; try setting $CRON_TIME "
"to a timestamp")
time = datetime.datetime.utcnow()
else:
# fetch this task from the queue
res = get_session().get('http://taskcluster/queue/v1/task/' + os.environ['TASK_ID'])
if res.status_code != 200:
try:
logger.error(res.json()['message'])
except:
logger.error(res.text)
res.raise_for_status()
# the task's `created` time is close to when the hook ran, although that
# may be some time ago if task execution was delayed
created = res.json()['created']
time = datetime.datetime.strptime(created, '%Y-%m-%dT%H:%M:%S.%fZ')

# round down to the nearest 15m
minute = time.minute - (time.minute % 15)
time = time.replace(minute=minute, second=0, microsecond=0)
logger.info("calculated cron schedule time is {}".format(time))
return time


def taskgraph_cron(options):
params = {
# name of this cron job (set per job below)
'job_name': '..',

# repositories
'base_repository': options['base_repository'],
'head_repository': options['head_repository'],

# the symbolic ref this should run against (which happens to be what
# run-task checked out for us)
'head_ref': options['head_ref'],

# *calculated* head_rev; this is based on the current meaning of this
# reference in the working copy
'head_rev': calculate_head_rev(options),

# the project (short name for the repository) and its SCM level
'project': options['project'],
'level': options['level'],

# if true, tasks will not actually be created
'no_create': options['no_create'],

# the time that this cron task was created (as a UTC datetime object)
'time': calculate_time(options),
}

jobs = load_jobs()

if options['force_run']:
job_name = options['force_run']
logger.info("force-running cron job {}".format(job_name))
run_job(job_name, jobs[job_name], params)
return

for job_name, job in sorted(jobs.items()):
if should_run(job, params):
logger.info("running cron job {}".format(job_name))
run_job(job_name, job, params)
else:
logger.info("not running cron job {}".format(job_name))
Loading

0 comments on commit b8d8f8a

Please sign in to comment.