Skip to content

Commit

Permalink
Change ES to a batch executor with variable batch size
Browse files Browse the repository at this point in the history
When the queue is long, the batch size will increase (up to a maximum
size) to let the worker increase the throughput at the cost of
latency.

This change requires many adjustements not only to the executor, but
also to the workerpool. This makes it more obvious that the workerpool
is not immune to race conditions, especially when checks occur. There
is now an additional todo to refactor the code and solve the
concurrency problems.
  • Loading branch information
stefano-maggiolo committed Sep 16, 2016
1 parent 63f095d commit 4757518
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 135 deletions.
10 changes: 9 additions & 1 deletion cms/server/admin/templates/overview.html
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@
var strings = [];
for (var i in response['data'])
{
var job = utils.repr_job(response['data'][i]['operation']);
var job = "";
if ($.isArray(response['data'][i]['operations'])) {
job = utils.repr_job(response['data'][i]['operations'][0]);
if (response['data'][i]['operations'].length > 1) {
job += ' and ' + (response['data'][i]['operations'].length - 1) + ' more';
}
} else {
job = utils.repr_job(response['data'][i]['operations']);
}
var start_time = utils.repr_time_ago(response['data'][i]['start_time']);
var connected = "Yes";
if (response['data'][i]['connected'] == false)
Expand Down
118 changes: 80 additions & 38 deletions cms/service/EvaluationService.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

from sqlalchemy import func, not_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload

from cms import ServiceCoord, get_service_shards
from cms.io import Executor, TriggeredService, rpc_method
Expand All @@ -65,20 +66,27 @@


class EvaluationExecutor(Executor):

# Real maximum number of operations to be sent to a worker.
MAX_OPERATIONS_PER_BATCH = 25

def __init__(self, evaluation_service):
"""Create the single executor for ES.
The executor just delegates work to the worker pool.
"""
super(EvaluationExecutor, self).__init__()
super(EvaluationExecutor, self).__init__(True)

self.evaluation_service = evaluation_service
self.pool = WorkerPool(self.evaluation_service)

# QueueItem (ESOperation) we have extracted from the queue,
# but not yet finished to execute.
self._currently_executing = None
# List of QueueItem (ESOperation) we have extracted from the
# queue, but not yet finished to execute.
self._currently_executing = []

# Lock used to guard the currently executing operations
self._current_execution_lock = gevent.coros.RLock()

# Whether execute need to drop the currently executing
# operation.
Expand All @@ -99,48 +107,74 @@ def __contains__(self, item):
"""
return super(EvaluationExecutor, self).__contains__(item) or \
self._currently_executing == item or \
item in self._currently_executing or \
item in self.pool

def execute(self, entry):
"""Execute an operation in the queue.
def max_operations_per_batch(self):
"""Return the maximum number of operations per batch.
The operation might not be executed immediately because of
We derive the number from the length of the queue divided by
the number of workers, with a cap at MAX_OPERATIONS_PER_BATCH.
"""
# TODO: len(self.pool) is the total number of workers,
# included those that are disabled.
ratio = len(self._operation_queue) // len(self.pool) + 1
ret = min(max(ratio, 1), EvaluationExecutor.MAX_OPERATIONS_PER_BATCH)
logger.info("Ratio is %d, executing %d operations together.",
ratio, ret)
return ret

def execute(self, entries):
"""Execute a batch of operations in the queue.
The operations might not be executed immediately because of
lack of workers.
entry (QueueEntry): entry containing the operation to perform.
entries ([QueueEntry]): entries containing the operations to
perform.
"""
self._currently_executing = entry.item
side_data = (entry.priority, entry.timestamp)
with self._current_execution_lock:
self._currently_executing = []
for entry in entries:
operation = entry.item
# Side data is attached to the operation sent to the
# worker pool. In case the operation is lost, the pool
# will return it to us, and we will use it to
# re-enqueue it.
operation.side_data = (entry.priority, entry.timestamp)
self._currently_executing.append(operation)
res = None
while res is None and not self._drop_current:
while len(self._currently_executing) > 0:
self.pool.wait_for_workers()
if self._drop_current:
break
res = self.pool.acquire_worker(entry.item,
side_data=side_data)
self._drop_current = False
self._currently_executing = None
with self._current_execution_lock:
if len(self._currently_executing) == 0:
break
res = self.pool.acquire_worker(self._currently_executing)
if res is not None:
self._drop_current = False
self._currently_executing = []
break

def dequeue(self, operation):
"""Remove an item from the queue.
We need to override dequeue because in execute we wait for a
worker to become available to serve the operation, and if that
operation needed to be dequeued, we need to remove it also
from there.
We need to override dequeue because the operation to dequeue
might have already been extracted, but not yet executed.
operation (ESOperation)
"""
try:
super(EvaluationExecutor, self).dequeue(operation)
except KeyError:
if self._currently_executing == operation:
self._drop_current = True
else:
raise
with self._current_execution_lock:
for i in range(len(self._currently_executing)):
if self._currently_executing[i] == operation:
del self._currently_executing[i]
return
raise


def with_post_finish_lock(func):
Expand Down Expand Up @@ -400,9 +434,10 @@ def check_workers_timeout(self):
"""
lost_operations = self.get_executor().pool.check_timeouts()
for priority, timestamp, operation in lost_operations:
for operation in lost_operations:
logger.info("Operation %s put again in the queue because of "
"worker timeout.", operation)
priority, timestamp = operation.side_data
self.enqueue(operation, priority, timestamp)
return True

Expand All @@ -412,9 +447,10 @@ def check_workers_connection(self):
"""
lost_operations = self.get_executor().pool.check_connections()
for priority, timestamp, operation in lost_operations:
for operation in lost_operations:
logger.info("Operation %s put again in the queue because of "
"disconnected worker.", operation)
priority, timestamp = operation.side_data
self.enqueue(operation, priority, timestamp)
return True

Expand All @@ -440,16 +476,14 @@ def enqueue(self, operation, priority, timestamp):
operation, priority, timestamp) > 0

@with_post_finish_lock
def action_finished(self, data, plus, error=None):
def action_finished(self, data, shard, error=None):
"""Callback from a worker, to signal that is finished some
action (compilation or evaluation).
data (dict): the JobGroup, exported to dict.
plus (tuple): the tuple (shard, (priority, timestamp)).
shard (int): the shard finishing the action.
"""
shard, side_data = plus

# We notify the pool that the worker is available again for
# further work (no matter how the current request turned out,
# even if the worker encountered an error). If the pool
Expand All @@ -458,7 +492,8 @@ def action_finished(self, data, plus, error=None):
# this method and do nothing because in that case we know the
# operation has returned to the queue and perhaps already been
# reassigned to another worker.
if self.get_executor().pool.release_worker(shard):
to_ignore = self.get_executor().pool.release_worker(shard)
if to_ignore is True:
logger.info("Ignored result from worker %s as requested.", shard)
return

Expand All @@ -481,8 +516,10 @@ def action_finished(self, data, plus, error=None):
operation = ESOperation.from_dict(job.operation)
logger.info("`%s' completed. Success: %s.",
operation, job.success)
self.result_cache.add(
operation, Result(side_data, job, job.success))
if isinstance(to_ignore, list) and operation in to_ignore:
logger.info("`%s' result ignored as requested", operation)
else:
self.result_cache.add(operation, Result(job, job.success))

@with_post_finish_lock
def write_results(self, items):
Expand Down Expand Up @@ -511,6 +548,8 @@ def write_results(self, items):
with SessionGen() as session:
# Dictionary holding the objects we use repeatedly,
# indexed by id, to avoid querying them multiple times.
# TODO: this pattern is used in WorkerPool and should be
# abstracted away.
datasets = dict()
subs = dict()
srs = dict()
Expand All @@ -520,8 +559,10 @@ def write_results(self, items):

# Get dataset.
if dataset_id not in datasets:
datasets[dataset_id] = \
Dataset.get_from_id(dataset_id, session)
datasets[dataset_id] = session.query(Dataset)\
.filter(Dataset.id == dataset_id)\
.options(joinedload(Dataset.testcases))\
.first()
dataset = datasets[dataset_id]
if dataset is None:
logger.error("Could not find dataset %d in the database.",
Expand Down Expand Up @@ -970,9 +1011,10 @@ def disable_worker(self, shard):
except ValueError:
return False

for priority, timestamp, operation in lost_operations:
for operation in lost_operations:
logger.info("Operation %s put again in the queue because "
"the worker was disabled.", operation)
priority, timestamp = operation.side_data
self.enqueue(operation, priority, timestamp)
return True

Expand Down
29 changes: 17 additions & 12 deletions cms/service/esoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,15 @@ def __repr__(self):
self.dataset_id,
self.testcase_codename)

def for_submission(self):
"""Return if the operation is for a submission or for a user test.
return (bool): True if this operation is for a submission.
"""
return self.type_ == ESOperation.COMPILATION or \
self.type_ == ESOperation.EVALUATION

def to_dict(self):
return {
"type": self.type_,
Expand All @@ -555,31 +564,27 @@ def to_dict(self):
"testcase_codename": self.testcase_codename
}

def build_job(self, session):
def build_job(self, object_, dataset):
"""Produce the Job for this operation.
Return the Job object that has to be sent to Workers to have
them perform the operation this object describes.
session (Session): the database session to use to fetch objects
if necessary.
object_ (Submission|UserTest): the object this operation
refers to (might be a submission or a user test).
dataset (Dataset): the dataset this operation refers to.
return (Job): the job encoding of the operation, as understood
by Workers and TaskTypes.
"""
result = None
dataset = Dataset.get_from_id(self.dataset_id, session)
if self.type_ == ESOperation.COMPILATION:
submission = Submission.get_from_id(self.object_id, session)
result = CompilationJob.from_submission(self, submission, dataset)
result = CompilationJob.from_submission(self, object_, dataset)
elif self.type_ == ESOperation.EVALUATION:
submission = Submission.get_from_id(self.object_id, session)
result = EvaluationJob.from_submission(self, submission, dataset)
result = EvaluationJob.from_submission(self, object_, dataset)
elif self.type_ == ESOperation.USER_TEST_COMPILATION:
user_test = UserTest.get_from_id(self.object_id, session)
result = CompilationJob.from_user_test(self, user_test, dataset)
result = CompilationJob.from_user_test(self, object_, dataset)
elif self.type_ == ESOperation.USER_TEST_EVALUATION:
user_test = UserTest.get_from_id(self.object_id, session)
result = EvaluationJob.from_user_test(self, user_test, dataset)
result = EvaluationJob.from_user_test(self, object_, dataset)
return result
Loading

0 comments on commit 4757518

Please sign in to comment.