Skip to content

Commit

Permalink
Introduce job prioritization to ExecutionGraph
Browse files Browse the repository at this point in the history
See googledoc for details: https://docs.google.com/a/twitter.com/document/d/1BGVAR_AdGXsf08MT0nBLh8s-lYenx7NK7Kk7keD_gwg/edit?usp=sharing

PROBLEM:
The jobs are submitted to the worker pool in a sort of insertion order. If the submission order was based on the “size” or “importance” of the job, it could result in less total execution time.

SOLUTION:
Introduce a priority queue where ready-to-be-submitted jobs are waiting for the moment when a worker becomes available, being sorted by “importance”. When a job finishes, its unblocked dependees is not submitted to the worker pool, but rather put in the priority queue. When a worker becomes available (i.e. when any job is finished), the most “important” job is taken from the priority queue. There is an additional latency (before, ready jobs are submitted to the worker pool immediately; after, they are all first put in the priority queue, and only then the most prioritized ones are chosen and submitted).

Additionally, make tracking of ready-to-be-submitted jobs smarter with a counter in status table which is decremented with every finished dependency.

Testing Done:
$ ./pants test tests/python/pants_test:all
SUCCESS

CI green: https://travis-ci.org/megaserg/pants/builds/78164959

Bugs closed: 2109

Reviewed at https://rbcommons.com/s/twitter/r/2601/
  • Loading branch information
megaserg authored and stuhood committed Sep 1, 2015
1 parent 73e25e7 commit a645a35
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 17 deletions.
103 changes: 89 additions & 14 deletions src/python/pants/backend/jvm/tasks/jvm_compile/execution_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
unicode_literals, with_statement)

import Queue as queue
import threading
import traceback
from collections import defaultdict
from collections import defaultdict, deque
from heapq import heappop, heappush

from pants.base.worker_pool import Work

Expand All @@ -19,19 +21,21 @@ class Job(object):
keys of its dependent jobs.
"""

def __init__(self, key, fn, dependencies, on_success=None, on_failure=None):
def __init__(self, key, fn, dependencies, size=0, on_success=None, on_failure=None):
"""
:param key: Key used to reference and look up jobs
:param fn callable: The work to perform
:param dependency_keys: List of keys for dependent jobs
:param dependencies: List of keys for dependent jobs
:param size: Estimated job size used for prioritization
:param on_success: Zero parameter callback to run if job completes successfully. Run on main
thread.
:param on_failure: Zero parameter callback to run if job completes successfully. Run on main
thread."""
self.key = key
self.fn = fn
self.dependencies = dependencies
self.size = size
self.on_success = on_success
self.on_failure = on_failure

Expand All @@ -57,8 +61,9 @@ def run_failure_callback(self):
class StatusTable(object):
DONE_STATES = {SUCCESSFUL, FAILED, CANCELED}

def __init__(self, keys):
def __init__(self, keys, pending_dependencies_count):
self._statuses = {key: UNSTARTED for key in keys}
self._pending_dependencies_count = pending_dependencies_count

def mark_as(self, state, key):
self._statuses[key] = state
Expand All @@ -73,12 +78,15 @@ def failed_keys(self):
def get(self, key):
return self._statuses.get(key)

def mark_one_successful_dependency(self, key):
self._pending_dependencies_count[key] -= 1

def is_ready_to_submit(self, key):
return self._pending_dependencies_count[key] == 0

def are_all_done(self):
return all(s in self.DONE_STATES for s in self._statuses.values())

def are_all_successful(self, keys):
return all(stat is SUCCESSFUL for stat in [self._statuses[k] for k in keys])

def has_failures(self):
return any(stat is FAILED for stat in self._statuses.values())

Expand Down Expand Up @@ -118,6 +126,23 @@ def __init__(self, key):
.format(key))


class ThreadSafeCounter(object):
def __init__(self):
self.lock = threading.Lock()
self._counter = 0

def get(self):
with self.lock:
return self._counter

def increment(self):
with self.lock:
self._counter += 1

def decrement(self):
with self.lock:
self._counter -= 1

class ExecutionGraph(object):
"""A directed acyclic graph of work to execute.
Expand All @@ -130,6 +155,7 @@ def __init__(self, job_list):
:param job_list Job: list of Jobs to schedule and run.
"""
self._dependencies = defaultdict(list)
self._dependees = defaultdict(list)
self._jobs = {}
self._job_keys_as_scheduled = []
Expand All @@ -145,6 +171,8 @@ def __init__(self, job_list):
if len(self._job_keys_with_no_dependencies) == 0:
raise NoRootJobError()

self._job_priority = self._compute_job_priorities(job_list)

def format_dependee_graph(self):
return "\n".join([
"{} -> {{\n {}\n}}".format(key, ',\n '.join(self._dependees[key]))
Expand All @@ -162,8 +190,35 @@ def _schedule(self, job):
if len(dependency_keys) == 0:
self._job_keys_with_no_dependencies.append(key)

for dep_name in dependency_keys:
self._dependees[dep_name].append(key)
self._dependencies[key] = dependency_keys
for dependency_key in dependency_keys:
self._dependees[dependency_key].append(key)

def _compute_job_priorities(self, job_list):
"""Walks the dependency graph breadth-first, starting from the most dependent tasks,
and computes the job priority as the sum of the jobs sizes along the critical path."""

job_size = {job.key: job.size for job in job_list}
job_priority = defaultdict(int)

bfs_queue = deque()
for job in job_list:
if len(self._dependees[job.key]) == 0:
job_priority[job.key] = job_size[job.key]
bfs_queue.append(job.key)

satisfied_dependees_count = defaultdict(int)
while len(bfs_queue) > 0:
job_key = bfs_queue.popleft()
for dependency_key in self._dependencies[job_key]:
job_priority[dependency_key] = \
max(job_priority[dependency_key],
job_size[dependency_key] + job_priority[job_key])
satisfied_dependees_count[dependency_key] += 1
if satisfied_dependees_count[dependency_key] == len(self._dependees[dependency_key]):
bfs_queue.append(dependency_key)

return job_priority

def execute(self, pool, log):
"""Runs scheduled work, ensuring all dependencies for each element are done before execution.
Expand All @@ -188,22 +243,38 @@ def execute(self, pool, log):
"""
log.debug(self.format_dependee_graph())

status_table = StatusTable(self._job_keys_as_scheduled)
status_table = StatusTable(self._job_keys_as_scheduled,
{key: len(self._jobs[key].dependencies) for key in self._job_keys_as_scheduled})
finished_queue = queue.Queue()

def submit_jobs(job_keys):
heap = []
jobs_in_flight = ThreadSafeCounter()

def put_jobs_into_heap(job_keys):
for job_key in job_keys:
# minus because jobs with larger priority should go first
heappush(heap, (-self._job_priority[job_key], job_key))

def try_to_submit_jobs_from_heap():
def worker(worker_key, work):
try:
work()
result = (worker_key, SUCCESSFUL, None)
except Exception as e:
result = (worker_key, FAILED, e)
finished_queue.put(result)
jobs_in_flight.decrement()

for job_key in job_keys:
while len(heap) > 0 and jobs_in_flight.get() < pool.num_workers:
priority, job_key = heappop(heap)
jobs_in_flight.increment()
status_table.mark_as(QUEUED, job_key)
pool.submit_async_work(Work(worker, [(job_key, (self._jobs[job_key]))]))

def submit_jobs(job_keys):
put_jobs_into_heap(job_keys)
try_to_submit_jobs_from_heap()

try:
submit_jobs(self._job_keys_with_no_dependencies)

Expand All @@ -213,6 +284,7 @@ def worker(worker_key, work):
except queue.Empty:
log.debug("Waiting on \n {}\n".format("\n ".join(
"{}: {}".format(key, state) for key, state in status_table.unfinished_items())))
try_to_submit_jobs_from_heap()
continue

finished_job = self._jobs[finished_key]
Expand All @@ -227,8 +299,11 @@ def worker(worker_key, work):
log.debug(traceback.format_exc())
raise ExecutionFailure("Error in on_success for {}".format(finished_key), e)

ready_dependees = [dependee for dependee in direct_dependees
if status_table.are_all_successful(self._jobs[dependee].dependencies)]
ready_dependees = []
for dependee in direct_dependees:
status_table.mark_one_successful_dependency(dependee)
if status_table.is_ready_to_submit(dependee):
ready_dependees.append(dependee)

submit_jobs(ready_dependees)
else: # Failed or canceled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,36 @@
from pants.util.fileutil import atomic_copy


def create_size_estimators():
def file_line_count(source_file_name):
with open(source_file_name, 'rb') as fh:
return sum(1 for line in fh)

return {
'linecount': lambda sources: sum([file_line_count(filepath) for filepath in sources]),
'filecount': lambda sources: len(sources),
'filesize': lambda sources: sum([os.path.getsize(filepath) for filepath in sources]),
'constzero': lambda sources: 0
}

class JvmCompileIsolatedStrategy(JvmCompileStrategy):
"""A strategy for JVM compilation that uses per-target classpaths and analysis."""

size_estimators = create_size_estimators()

@classmethod
def size_estimator_by_name(cls, estimation_strategy_name):
return cls.size_estimators[estimation_strategy_name]

@classmethod
def register_options(cls, register, compile_task_name, supports_concurrent_execution):
if supports_concurrent_execution:
register('--worker-count', advanced=True, type=int, default=1,
help='The number of concurrent workers to use compiling with {task} with the '
'isolated strategy.'.format(task=compile_task_name))
register('--size-estimator', advanced=True,
choices=list(cls.size_estimators.keys()), default='filesize',
help='The method of target size estimation.')
register('--capture-log', advanced=True, action='store_true', default=False,
fingerprint=True,
help='Capture compilation output to per-target logs.')
Expand All @@ -58,6 +79,8 @@ def __init__(self, context, options, workdir, analysis_tools, compile_task_name,
worker_count = 1
self._worker_count = worker_count

self._size_estimator = self.size_estimator_by_name(options.size_estimator)

self._worker_pool = None

def name(self):
Expand Down Expand Up @@ -247,6 +270,7 @@ def work():
jobs.append(Job(self.exec_graph_key_for_target(compile_target),
create_work_for_vts(vts, compile_context, compile_target_closure),
[self.exec_graph_key_for_target(target) for target in invalid_dependencies],
self._size_estimator(compile_context.sources),
# If compilation and analysis work succeeds, validate the vts.
# Otherwise, fail it.
on_success=vts.update,
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/base/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, parent_workunit, run_tracker, num_workers):

self._shutdown_hooks = []

self.num_workers = num_workers

def add_shutdown_hook(self, hook):
self._shutdown_hooks.append(hook)

Expand Down
60 changes: 57 additions & 3 deletions tests/python/pants_test/tasks/test_execution_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


class ImmediatelyExecutingPool(object):
num_workers = 1

def submit_async_work(self, work):
work.func(*work.args_tuples[0])
Expand Down Expand Up @@ -43,12 +44,12 @@ def setUp(self):
def execute(self, exec_graph):
exec_graph.execute(ImmediatelyExecutingPool(), PrintLogger())

def job(self, name, fn, dependencies, on_success=None, on_failure=None):
def job(self, name, fn, dependencies, size=0, on_success=None, on_failure=None):
def recording_fn():
self.jobs_run.append(name)
fn()

return Job(name, recording_fn, dependencies, on_success, on_failure)
return Job(name, recording_fn, dependencies, size, on_success, on_failure)

def test_single_job(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [])])
Expand Down Expand Up @@ -144,7 +145,7 @@ def test_failure_of_one_leg_of_tree_does_not_cancel_other(self):
with self.assertRaises(ExecutionFailure) as cm:
self.execute(exec_graph)

self.assertEqual(["B", "F", "A"], self.jobs_run)
self.assertTrue(self.jobs_run == ["B", "F", "A"] or self.jobs_run == ["B", "A", "F"])
self.assertEqual("Failed jobs: F", str(cm.exception))

def test_failure_of_disconnected_job_does_not_cancel_non_dependents(self):
Expand Down Expand Up @@ -194,3 +195,56 @@ def test_same_key_scheduled_twice_is_error(self):
self.job("Same", passing_fn, [])])

self.assertEqual("Unexecutable graph: Job already scheduled u'Same'", str(cm.exception))

def test_priorities_for_chain_of_jobs(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 8),
self.job("B", passing_fn, ["A"], 4),
self.job("C", passing_fn, ["B"], 2),
self.job("D", passing_fn, ["C"], 1)])
self.assertEqual(exec_graph._job_priority, {"A": 15, "B": 7, "C": 3, "D": 1})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "B", "C", "D"])

def test_priorities_for_fork(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 4),
self.job("B", passing_fn, ["A"], 2),
self.job("C", passing_fn, ["A"], 1)])
self.assertEqual(exec_graph._job_priority, {"A": 6, "B": 2, "C": 1})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "B", "C"])

def test_priorities_for_mirrored_fork(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 4),
self.job("B", passing_fn, ["A"], 1),
self.job("C", passing_fn, ["A"], 2)])
self.assertEqual(exec_graph._job_priority, {"A": 6, "B": 1, "C": 2})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "C", "B"])

def test_priorities_for_diamond(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 8),
self.job("B", passing_fn, ["A"], 4),
self.job("C", passing_fn, ["A"], 2),
self.job("D", passing_fn, ["B", "C"], 1)])
self.assertEqual(exec_graph._job_priority, {"A": 13, "B": 5, "C": 3, "D": 1})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "B", "C", "D"])

def test_priorities_for_mirrored_diamond(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 8),
self.job("B", passing_fn, ["A"], 2),
self.job("C", passing_fn, ["A"], 4),
self.job("D", passing_fn, ["B", "C"], 1)])
self.assertEqual(exec_graph._job_priority, {"A": 13, "B": 3, "C": 5, "D": 1})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "C", "B", "D"])

def test_priorities_for_skewed_diamond(self):
exec_graph = ExecutionGraph([self.job("A", passing_fn, [], 1),
self.job("B", passing_fn, ["A"], 2),
self.job("C", passing_fn, ["B"], 4),
self.job("D", passing_fn, ["A"], 8),
self.job("E", passing_fn, ["C", "D"], 16)])
self.assertEqual(exec_graph._job_priority, {"A": 25, "B": 22, "C": 20, "D": 24, "E": 16})
self.execute(exec_graph)
self.assertEqual(self.jobs_run, ["A", "D", "B", "C", "E"])

0 comments on commit a645a35

Please sign in to comment.