Skip to content

Commit

Permalink
Use direct queries for sweeping missing operations in ES
Browse files Browse the repository at this point in the history
  • Loading branch information
stefano-maggiolo committed Jan 8, 2016
1 parent ab62f0c commit 1380d44
Show file tree
Hide file tree
Showing 6 changed files with 1,029 additions and 14 deletions.
31 changes: 31 additions & 0 deletions cms/db/usertest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Contest Management System - http://cms-dev.github.io/
# Copyright © 2012 Giovanni Mascellani <[email protected]>
# Copyright © 2012-2015 Luca Wehrstedt <[email protected]>
# Copyright © 2015-2016 Stefano Maggiolo <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -342,6 +343,13 @@ def compiled(self):
"""
return self.compilation_outcome is not None

@staticmethod
def filter_compiled():
"""Return a filtering expression for compiled user test results.
"""
return UserTestResult.compilation_outcome.isnot(None)

def compilation_failed(self):
"""Return whether the user test result did not compile.
Expand All @@ -352,6 +360,14 @@ def compilation_failed(self):
"""
return self.compilation_outcome == "fail"

@staticmethod
def filter_compilation_failed():
"""Return a filtering expression for user test results failing
compilation.
"""
return UserTestResult.compilation_outcome == "fail"

def compilation_succeeded(self):
"""Return whether the user test compiled.
Expand All @@ -362,6 +378,14 @@ def compilation_succeeded(self):
"""
return self.compilation_outcome == "ok"

@staticmethod
def filter_compilation_succeeded():
"""Return a filtering expression for user test results failing
compilation.
"""
return UserTestResult.compilation_outcome == "ok"

def evaluated(self):
"""Return whether the user test result has been evaluated.
Expand All @@ -370,6 +394,13 @@ def evaluated(self):
"""
return self.evaluation_outcome is not None

@staticmethod
def filter_evaluated():
"""Return a filtering lambda for evaluated user test results.
"""
return UserTestResult.evaluation_outcome.isnot(None)

def invalidate_compilation(self):
"""Blank all compilation and evaluation outcomes.
Expand Down
27 changes: 15 additions & 12 deletions cms/service/EvaluationService.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# Contest Management System - http://cms-dev.github.io/
# Copyright © 2010-2014 Giovanni Mascellani <[email protected]>
# Copyright © 2010-2015 Stefano Maggiolo <[email protected]>
# Copyright © 2010-2016 Stefano Maggiolo <[email protected]>
# Copyright © 2010-2012 Matteo Boscariol <[email protected]>
# Copyright © 2013-2015 Luca Wehrstedt <[email protected]>
# Copyright © 2013 Bernard Blackham <[email protected]>
Expand Down Expand Up @@ -45,13 +45,14 @@

from cms import ServiceCoord, get_service_shards
from cms.io import Executor, TriggeredService, rpc_method
from cms.db import SessionGen, Contest, Dataset, Submission, \
from cms.db import SessionGen, Dataset, Submission, \
SubmissionResult, Task, UserTest
from cms.service import get_submissions, get_submission_results, \
get_datasets_to_judge
from cms.service import get_datasets_to_judge, \
get_submissions, get_submission_results
from cms.grading.Job import Job

from .esoperations import ESOperation, get_relevant_operations, \
get_submissions_operations, get_user_tests_operations, \
submission_get_operations, submission_to_evaluate, \
user_test_get_operations
from .workerpool import WorkerPool
Expand Down Expand Up @@ -272,14 +273,16 @@ def _missing_operations(self):
"""
counter = 0
with SessionGen() as session:
contest = session.query(Contest).\
filter_by(id=self.contest_id).first()

# Scan through submissions and user tests
for submission in contest.get_submissions():
counter += self.submission_enqueue_operations(submission)
for user_test in contest.get_user_tests():
counter += self.user_test_enqueue_operations(user_test)

for operation, timestamp, priority in \
get_submissions_operations(session, self.contest_id):
if self.enqueue(operation, timestamp, priority):
counter += 1

for operation, timestamp, priority in \
get_user_tests_operations(session, self.contest_id):
if self.enqueue(operation, timestamp, priority):
counter += 1

return counter

Expand Down
237 changes: 235 additions & 2 deletions cms/service/esoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# Contest Management System - http://cms-dev.github.io/
# Copyright © 2010-2014 Giovanni Mascellani <[email protected]>
# Copyright © 2010-2015 Stefano Maggiolo <[email protected]>
# Copyright © 2010-2016 Stefano Maggiolo <[email protected]>
# Copyright © 2010-2012 Matteo Boscariol <[email protected]>
# Copyright © 2013-2015 Luca Wehrstedt <[email protected]>
# Copyright © 2013 Bernard Blackham <[email protected]>
Expand Down Expand Up @@ -33,8 +33,11 @@

import logging

from sqlalchemy import case, literal

from cms.io import PriorityQueue, QueueItem
from cms.db import Dataset, Submission, UserTest
from cms.db import Dataset, Evaluation, Submission, SubmissionResult, \
Task, Testcase, UserTest, UserTestResult
from cms.grading.Job import CompilationJob, EvaluationJob


Expand All @@ -47,6 +50,36 @@
MAX_USER_TEST_EVALUATION_TRIES = 3


FILTER_SUBMISSION_DATASETS_TO_JUDGE = (
(Dataset.id == Task.active_dataset_id) |
(Dataset.autojudge.is_(True))
)
FILTER_SUBMISSION_RESULTS_TO_COMPILE = (
(~SubmissionResult.filter_compiled()) &
(SubmissionResult.compilation_tries < MAX_COMPILATION_TRIES)
)
FILTER_SUBMISSION_RESULTS_TO_EVALUATE = (
SubmissionResult.filter_compilation_succeeded() &
(~SubmissionResult.filter_evaluated()) &
(SubmissionResult.evaluation_tries < MAX_EVALUATION_TRIES)
)


FILTER_USER_TEST_DATASETS_TO_JUDGE = (
(Dataset.id == Task.active_dataset_id) |
(Dataset.autojudge.is_(True))
)
FILTER_USER_TEST_RESULTS_TO_COMPILE = (
(~UserTestResult.filter_compiled()) &
(UserTestResult.compilation_tries < MAX_COMPILATION_TRIES)
)
FILTER_USER_TEST_RESULTS_TO_EVALUATE = (
UserTestResult.filter_compilation_succeeded() &
(~UserTestResult.filter_evaluated()) &
(UserTestResult.evaluation_tries < MAX_EVALUATION_TRIES)
)


def submission_to_compile(submission_result):
"""Return whether ES is interested in compiling the submission.
Expand Down Expand Up @@ -256,6 +289,199 @@ def get_relevant_operations(level, submissions, dataset_id=None):
return operations


def get_submissions_operations(session, contest_id):
"""Return all the operations to do for submissions in the contest.
session (Session): the database session to use.
contest_id (int): the contest for which we want the operations.
return ([ESOperation, float, int]): a list of operation, timestamp
and priority.
"""
operations = []

# Retrieve the compilation operations for all submissions without
# the corresponding result for a dataset to judge. Since we have
# no SubmissionResult, we cannot join regularly with dataset;
# instead we take the cartesian product with all the datasets for
# the correct task.
to_compile = session.query(Submission)\
.join(Submission.task)\
.join(Task.datasets)\
.outerjoin(SubmissionResult,
(Dataset.id == SubmissionResult.dataset_id) &
(Submission.id == SubmissionResult.submission_id))\
.filter(
(Task.contest_id == contest_id) &
(FILTER_SUBMISSION_DATASETS_TO_JUDGE) &
(SubmissionResult.dataset_id.is_(None)))\
.with_entities(Submission.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW))
], else_=literal(PriorityQueue.PRIORITY_HIGH)),
Submission.timestamp)\
.all()

# Retrieve all the compilation operations for submissions
# already having a result for a dataset to judge.
to_compile += session.query(Submission)\
.join(Submission.task)\
.join(Submission.results)\
.join(SubmissionResult.dataset)\
.filter(
(Task.contest_id == contest_id) &
(FILTER_SUBMISSION_DATASETS_TO_JUDGE) &
(FILTER_SUBMISSION_RESULTS_TO_COMPILE))\
.with_entities(Submission.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW)),
(SubmissionResult.compilation_tries == 0,
literal(PriorityQueue.PRIORITY_HIGH))
], else_=literal(PriorityQueue.PRIORITY_MEDIUM)),
Submission.timestamp)\
.all()

for data in to_compile:
submission_id, dataset_id, priority, timestamp = data
operations.append((
ESOperation(ESOperation.COMPILATION, submission_id, dataset_id),
priority, timestamp))

# Retrieve all the evaluation operations for a dataset to
# judge. Again we need to pick all tuples (submission, dataset,
# testcase) such that there is no evaluation for them, and to do
# so we take the cartesian product with the testcases and later
# ensure that there is no evaluation associated.
to_evaluate = session.query(SubmissionResult)\
.join(SubmissionResult.dataset)\
.join(SubmissionResult.submission)\
.join(Submission.task)\
.join(Dataset.testcases)\
.outerjoin(Evaluation,
(Evaluation.submission_id == Submission.id) &
(Evaluation.dataset_id == Dataset.id) &
(Evaluation.testcase_id == Testcase.id))\
.filter(
(Task.contest_id == contest_id) &
(FILTER_SUBMISSION_DATASETS_TO_JUDGE) &
(FILTER_SUBMISSION_RESULTS_TO_EVALUATE) &
(Evaluation.id.is_(None)))\
.with_entities(Submission.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW)),
(SubmissionResult.evaluation_tries == 0,
literal(PriorityQueue.PRIORITY_MEDIUM))
], else_=literal(PriorityQueue.PRIORITY_LOW)),
Submission.timestamp,
Testcase.codename)\
.all()

for data in to_evaluate:
submission_id, dataset_id, priority, timestamp, codename = data
operations.append((
ESOperation(
ESOperation.EVALUATION, submission_id, dataset_id, codename),
priority, timestamp))

return operations


def get_user_tests_operations(session, contest_id):
"""Return all the operations to do for user tests in the contest.
session (Session): the database session to use.
contest_id (int): the contest for which we want the operations.
return ([ESOperation, float, int]): a list of operation, timestamp
and priority.
"""
operations = []

# Retrieve the compilation operations for all user tests without
# the corresponding result for a dataset to judge. Since we have
# no UserTestResult, we cannot join regularly with dataset;
# instead we take the cartesian product with all the datasets for
# the correct task.
to_compile = session.query(UserTest)\
.join(UserTest.task)\
.join(Task.datasets)\
.outerjoin(UserTestResult,
(Dataset.id == UserTestResult.dataset_id) &
(UserTest.id == UserTestResult.user_test_id))\
.filter(
(Task.contest_id == contest_id) &
(FILTER_USER_TEST_DATASETS_TO_JUDGE) &
(UserTestResult.dataset_id.is_(None)))\
.with_entities(UserTest.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW))
], else_=literal(PriorityQueue.PRIORITY_HIGH)),
UserTest.timestamp)\
.all()

# Retrieve all the compilation operations for user_tests
# already having a result for a dataset to judge.
to_compile += session.query(UserTest)\
.join(UserTest.task)\
.join(UserTest.results)\
.join(UserTestResult.dataset)\
.filter(
(Task.contest_id == contest_id) &
(FILTER_USER_TEST_DATASETS_TO_JUDGE) &
(FILTER_USER_TEST_RESULTS_TO_COMPILE))\
.with_entities(UserTest.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW)),
(UserTestResult.compilation_tries == 0,
literal(PriorityQueue.PRIORITY_HIGH))
], else_=literal(PriorityQueue.PRIORITY_MEDIUM)),
UserTest.timestamp)\
.all()

for data in to_compile:
user_test_id, dataset_id, priority, timestamp = data
operations.append((
ESOperation(ESOperation.COMPILATION, user_test_id, dataset_id),
priority, timestamp))

# Retrieve all the evaluation operations for a dataset to judge,
# that is, all pairs (user_test, dataset) for which we have a
# user test result which is compiled but not evaluated.
to_evaluate = session.query(UserTest)\
.join(UserTest.task)\
.join(UserTest.results)\
.join(UserTestResult.dataset)\
.filter(
(Task.contest_id == contest_id) &
(FILTER_USER_TEST_DATASETS_TO_JUDGE) &
(FILTER_USER_TEST_RESULTS_TO_EVALUATE))\
.with_entities(UserTest.id, Dataset.id,
case([
(Dataset.id != Task.active_dataset_id,
literal(PriorityQueue.PRIORITY_EXTRA_LOW)),
(UserTestResult.evaluation_tries == 0,
literal(PriorityQueue.PRIORITY_MEDIUM))
], else_=literal(PriorityQueue.PRIORITY_LOW)),
UserTest.timestamp)\
.all()

for data in to_evaluate:
user_test_id, dataset_id, priority, timestamp = data
operations.append((
ESOperation(
ESOperation.EVALUATION, user_test_id, dataset_id),
priority, timestamp))

return operations


class ESOperation(QueueItem):

COMPILATION = "compile"
Expand Down Expand Up @@ -294,6 +520,13 @@ def __str__(self):
return "%s on %d against dataset %d" % (
self.type_, self.object_id, self.dataset_id)

def __repr__(self):
return "(\"%s\", %s, %s, %s)" % (
self.type_,
self.object_id,
self.dataset_id,
self.testcase_codename)

def to_dict(self):
return {"type": self.type_,
"object_id": self.object_id,
Expand Down
Loading

0 comments on commit 1380d44

Please sign in to comment.