Skip to content

Commit

Permalink
Parallel startup of processes in functional tests
Browse files Browse the repository at this point in the history
In my machine, this reduces the startup time (till first submission)
from 21s to 9s.
  • Loading branch information
stefano-maggiolo committed Sep 21, 2016
1 parent b6fa831 commit 112e82c
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 257 deletions.
19 changes: 12 additions & 7 deletions cmstestsuite/RunUnitTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-

# Contest Management System - http://cms-dev.github.io/
# Copyright © 2013 Stefano Maggiolo <[email protected]>
# Copyright © 2013-2016 Stefano Maggiolo <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand All @@ -22,17 +22,21 @@
from __future__ import unicode_literals

import io
import logging
import os
import sys
import subprocess
import datetime
from argparse import ArgumentParser

from cms import utf8_decoder
from cmstestsuite import CONFIG, FrameworkException, info, sh
from cmstestsuite import CONFIG, FrameworkException, sh
from cmstestsuite import combine_coverage


logger = logging.getLogger(__name__)


FAILED_UNITTEST_FILENAME = '.unittestfailures'


Expand All @@ -43,21 +47,21 @@ def run_unittests(test_list):
format (path, filename.py).
return (int):
"""
info("Running unit tests...")
logger.info("Running unit tests...")

failures = []
num_tests_to_execute = len(test_list)

# For all tests...
for i, (path, filename) in enumerate(test_list):
info("Running test %d/%d: %s.%s" % (
logger.info("Running test %d/%d: %s.%s" % (
i + 1, num_tests_to_execute,
path, filename))
try:
sh('python2 -m coverage run -p --source=cms %s' %
os.path.join(path, filename))
except FrameworkException:
info(" (FAILED: %s)" % filename)
logger.info(" (FAILED: %s)" % filename)

# Add this case to our list of failures, if we haven't already.
failures.append((path, filename))
Expand Down Expand Up @@ -180,7 +184,8 @@ def main():
return 0

if args.retry_failed:
info("Re-running %d failed tests from last run." % len(test_list))
logger.info("Re-running %d failed tests from last run." %
len(test_list))

# Load config from cms.conf.
CONFIG["TEST_DIR"] = git_root
Expand All @@ -194,7 +199,7 @@ def main():
os.environ["PYTHONPATH"] = "%(TEST_DIR)s" % CONFIG

# Clear out any old coverage data.
info("Clearing old coverage data.")
logger.info("Clearing old coverage data.")
sh("python -m coverage erase")

# Run all of our test cases.
Expand Down
229 changes: 8 additions & 221 deletions cmstestsuite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@
from __future__ import print_function
from __future__ import unicode_literals

import atexit
import errno
import io
import json
import logging
import mechanize
import os
import re
import signal
import socket
import subprocess
import time
from urlparse import urlsplit

from cmstestsuite.web import browser_do_request
from cmstestsuite.web.AWSRequests import \
AWSLoginRequest, AWSSubmissionViewRequest
from cmstestsuite.web.CWSRequests import CWSLoginRequest, SubmitRequest


logger = logging.getLogger(__name__)


# CONFIG is populated by our test script.
CONFIG = {
'VERBOSITY': 0,
Expand All @@ -54,12 +53,6 @@
cms_config = None


# We store a list of all services that are running so that we can cleanly shut
# them down.
running_services = {}
running_servers = {}


# List of users and tasks we created as part of the test.
created_users = {}
created_tasks = {}
Expand Down Expand Up @@ -107,48 +100,6 @@ class FrameworkException(Exception):
pass


class RemoteService(object):
"""Class which implements the RPC protocol used by CMS.
This is deliberately a re-implementation in order to catch or
trigger bugs in the CMS services.
"""
def __init__(self, service_name, shard):
address, port = cms_config["core_services"][service_name][shard]

self.service_name = service_name
self.shard = shard
self.address = address
self.port = port

def call(self, function_name, data):
"""Perform a synchronous RPC call."""
s = json.dumps({
"__id": "foo",
"__method": function_name,
"__data": data,
})
msg = s + "\r\n"

# Send message.
sock = socket.socket()
sock.connect((self.address, self.port))
sock.send(msg)

# Wait for response.
s = ''
while len(s) < 2 or s[-2:] != "\r\n":
s += sock.recv(1)
s = s[:-2]
sock.close()

# Decode reply.
reply = json.loads(s)

return reply


def read_cms_config():
global cms_config
cms_config = json.load(io.open("%(CONFIG_PATH)s" % CONFIG,
Expand All @@ -165,12 +116,12 @@ def sh(cmdline, ignore_failure=False):
"""Execute a simple shell command.
If cmdline is a string, it is passed to sh -c verbatim. All escaping must
be performed by the user. If cmdline is an array, then no escaping is
be performed by the user. If cmdline is an array, then no escaping is
required.
"""
if CONFIG["VERBOSITY"] >= 1:
print('$', cmdline)
logger.info(str('$' + cmdline))
if CONFIG["VERBOSITY"] >= 3:
cmdline += ' > /dev/null 2>&1'
if isinstance(cmdline, list):
Expand All @@ -183,37 +134,6 @@ def sh(cmdline, ignore_failure=False):
(ret & 0xff, ret >> 8, cmdline))


def spawn(cmdline):
"""Execute a python application."""

def kill(job):
try:
job.kill()
except OSError:
pass

if CONFIG["VERBOSITY"] >= 1:
print('$', ' '.join(cmdline))

if CONFIG["TEST_DIR"] is not None and CONFIG.get("COVERAGE"):
cmdline = ['python', '-m', 'coverage', 'run', '-p', '--source=cms'] + \
cmdline

if CONFIG["VERBOSITY"] >= 3:
stdout = None
stderr = None
else:
stdout = io.open(os.devnull, 'wb')
stderr = stdout
job = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr)
atexit.register(lambda: kill(job))
return job


def info(s):
print('==>', s)


def configure_cms(options):
"""Creates the cms.conf file, setting any parameters as requested.
Expand Down Expand Up @@ -248,141 +168,8 @@ def configure_cms(options):
read_cms_config()


def start_prog(path, shard=0, contest=None):
"""Execute a CMS process."""
args = [path]
if shard is not None:
args.append("%s" % shard)
if contest is not None:
args += ['-c', "%s" % contest]
return spawn(args)


def start_servicer(service_name, check, shard=0, contest=None):
"""Start a CMS service."""

info("Starting %s." % service_name)
executable = os.path.join('.', 'scripts', 'cms%s' % (service_name))
if CONFIG["TEST_DIR"] is None:
executable = 'cms%s' % service_name
prog = start_prog(executable, shard=shard, contest=contest)

# Wait for service to come up - ping it!
attempts = 0
while attempts <= 12:
attempts += 1
try:
try:
check(service_name, shard)
except socket.error as error:
if error.errno != errno.ECONNREFUSED:
raise error
else:
time.sleep(0.1 * (1.2 ** attempts))
continue
else:
return prog
except Exception:
print("Unexpected exception while waiting for the service:")
raise

# If we arrive here, it means the service was not fired up.
if shard is None:
raise FrameworkException("Failed to bring up service %s" %
service_name)
else:
raise FrameworkException("Failed to bring up service %s/%d" %
(service_name, shard))


def check_service(service_name, shard):
"""Check if the service is up."""
rs = RemoteService(service_name, shard)
reply = rs.call("echo", {"string": "hello"})
if reply['__data'] != 'hello':
raise Exception("Strange response from service.")


def start_service(service_name, shard=0, contest=None):
"""Start a CMS service."""
prog = start_servicer(service_name, check_service, shard, contest)
rs = RemoteService(service_name, shard)
running_services[(service_name, shard, contest)] = (rs, prog)

return prog


def restart_service(service_name, shard=0, contest=None):
shutdown_service(service_name, shard, contest)
return start_service(service_name, shard, contest)


def check_server(service_name, shard):
"""Check if the server is up."""
check_service(service_name, shard)
if service_name == 'AdminWebServer':
port = cms_config['admin_listen_port']
else:
port = cms_config['contest_listen_port'][shard]
sock = socket.socket()
sock.connect(('127.0.0.1', port))
sock.close()


def start_server(service_name, shard=0, contest=None):
"""Start a CMS server."""
prog = start_servicer(service_name, check_server, shard, contest)
running_servers[service_name] = prog

return prog


def check_ranking_web_server(service_name, shard):
"""Check if RankingWebServer is up."""
assert service_name == "RankingWebServer"
assert shard is None
url = urlsplit(cms_config['rankings'][0])
sock = socket.socket()
sock.connect((url.hostname, url.port))
sock.close()


def start_ranking_web_server():
"""Start the RankingWebServer. It's a bit special compared to the
others.
"""
prog = start_servicer(
"RankingWebServer", check_ranking_web_server, shard=None)
running_servers['RankingWebServer'] = prog
return prog


def shutdown_service(service_name, shard=0, contest=None):
rs, prog = running_services[(service_name, shard, contest)]

info("Asking %s/%d to terminate..." % (service_name, shard))
rs = running_services[(service_name, shard, contest)]
rs = RemoteService(service_name, shard)
rs.call("quit", {"reason": "from test harness"})
prog.wait()

del running_services[(service_name, shard, contest)]


def shutdown_services():
for key in running_services.keys():
service_name, shard, contest = key
shutdown_service(service_name, shard, contest)

for name, server in running_servers.iteritems():
info("Terminating %s." % name)
os.kill(server.pid, signal.SIGINT)
server.wait()


def combine_coverage():
info("Combining coverage results.")
logger.info("Combining coverage results.")
sh("python -m coverage combine")


Expand All @@ -392,7 +179,7 @@ def initialize_aws(rand):
rand (int): some random bit to add to the admin username.
"""
info("Creating admin...")
logger.info("Creating admin...")
admin_info["username"] = "admin%s" % rand
admin_info["password"] = "adminpwd"
sh("python cmscontrib/AddAdmin.py %(username)s -p %(password)s"
Expand Down
Loading

0 comments on commit 112e82c

Please sign in to comment.