Skip to content

Commit

Permalink
Bug 1524845 - download data about previous crons, actions in parallel…
Browse files Browse the repository at this point in the history
… r=aki

Differential Revision: https://phabricator.services.mozilla.com/D19086

--HG--
extra : moz-landing-system : lando
  • Loading branch information
djmitche committed Feb 8, 2019
1 parent 9074284 commit 9662aae
Showing 1 changed file with 48 additions and 28 deletions.
76 changes: 48 additions & 28 deletions taskcluster/taskgraph/actions/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from __future__ import absolute_import, print_function, unicode_literals

import concurrent.futures as futures
import copy
import logging
import os
Expand All @@ -25,6 +26,7 @@
get_artifact,
list_tasks,
parse_time,
CONCURRENCY,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,34 +64,52 @@ def fetch_graph_and_labels(parameters, graph_config):
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")

# Now fetch any modifications made by action tasks and swap out new tasks
# for old ones
namespace = '{}.v2.{}.pushlog-id.{}.actions'.format(
graph_config['trust-domain'],
parameters['project'],
parameters['pushlog_id'])
for task_id in list_tasks(namespace):
logger.info('fetching label-to-taskid.json for action task {}'.format(task_id))
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
logger.debug('No label-to-taskid.json found for {}: {}'.format(task_id, e))
continue

# Similarly for cron tasks..
namespace = '{}.v2.{}.revision.{}.cron'.format(
graph_config['trust-domain'],
parameters['project'],
parameters['head_rev'])
for task_id in list_tasks(namespace):
logger.info('fetching label-to-taskid.json for cron task {}'.format(task_id))
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
logger.debug('No label-to-taskid.json found for {}: {}'.format(task_id, e))
continue
# fetch everything in parallel; this avoids serializing any delay in downloading
# each artifact (such as waiting for the artifact to be mirrored locally)
with futures.ThreadPoolExecutor(CONCURRENCY) as e:
fetches = []

# fetch any modifications made by action tasks and swap out new tasks
# for old ones
def fetch_action(task_id):
logger.info('fetching label-to-taskid.json for action task {}'.format(task_id))
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
if e.response.status_code != 404:
raise
logger.debug('No label-to-taskid.json found for {}: {}'.format(task_id, e))

namespace = '{}.v2.{}.pushlog-id.{}.actions'.format(
graph_config['trust-domain'],
parameters['project'],
parameters['pushlog_id'])
for task_id in list_tasks(namespace):
fetches.append(e.submit(fetch_action, task_id))

# Similarly for cron tasks..
def fetch_cron(task_id):
logger.info('fetching label-to-taskid.json for cron task {}'.format(task_id))
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
if e.response.status_code != 404:
raise
logger.debug('No label-to-taskid.json found for {}: {}'.format(task_id, e))

namespace = '{}.v2.{}.revision.{}.cron'.format(
graph_config['trust-domain'],
parameters['project'],
parameters['head_rev'])
for task_id in list_tasks(namespace):
fetches.append(e.submit(fetch_cron, task_id))

# now wait for each fetch to complete, raising an exception if there
# were any issues
for f in futures.as_completed(fetches):
f.result()

return (decision_task_id, full_task_graph, label_to_taskid)

Expand Down

0 comments on commit 9662aae

Please sign in to comment.