diff --git a/.travis.yml b/.travis.yml index c99b8a8..842d97f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,9 +5,16 @@ python: - "3.4" # command to install dependencies install: - - "python setup.py install" + # Install package and its dependency for full test - "pip install coverage coveralls" - "pip install sphinx_bootstrap_theme" + # Install slurm + - "sudo apt-get install slurm-llnl " + - "sudo /usr/sbin/create-munge-key" + - "sudo service munge start" + - "sudo python continuous_integration/configure_slurm.py" + # command to run tests script: make after_success: coveralls +cache: apt diff --git a/clusterlib/scheduler.py b/clusterlib/scheduler.py index d4de9a7..4e939c0 100644 --- a/clusterlib/scheduler.py +++ b/clusterlib/scheduler.py @@ -11,39 +11,52 @@ # Authors: Arnaud Joly # # License: BSD 3 clause +from __future__ import unicode_literals +import os import subprocess from xml.etree import ElementTree + __all__ = [ "queued_or_running_jobs", "submit" ] -def _sge_queued_or_running_jobs(): +def _sge_queued_or_running_jobs(user=None): + """Get queued or running jobs from SGE queue system""" + command = "qstat -xml" + if user is not None: + command += " -u {}".format(user) + try: - xml = subprocess.check_output("qstat -xml", shell=True, - stderr=subprocess.PIPE) - tree = ElementTree.fromstring(xml) - return [leaf.text for leaf in tree.iter("JB_name")] + with open(os.devnull, 'w') as shutup: + xml = subprocess.check_output(command, shell=True, stderr=shutup) + tree = ElementTree.fromstring(xml) + return [leaf.text for leaf in tree.iter("JB_name")] except subprocess.CalledProcessError: # qstat is not available return [] -def _slurm_queued_or_running_jobs(): +def _slurm_queued_or_running_jobs(user=None): + """Get queued or running jobs from SLURM queue system""" + command = "squeue --noheader -o %j" + if user is not None: + command += " -u {}".format(user) + try: - out = subprocess.check_output("squeue --noheader -o %j", shell=True, - stderr=subprocess.PIPE) - out = out.split("\n")[:-1] - return out + with open(os.devnull, 'w') as shutup: + out = subprocess.check_output(command, shell=True, stderr=shutup) + out = out.splitlines() + return out except subprocess.CalledProcessError: # squeue is not available return [] -def queued_or_running_jobs(): +def queued_or_running_jobs(user=None): """Return the names of the queued or running jobs under SGE and SLURM The list of jobs could be either the list of all jobs on the scheduler @@ -52,6 +65,11 @@ def queued_or_running_jobs(): Try ``qstat`` in SGE or ``squeue`` in SLURM to know which behavior it follows. + Parameters + ---------- + user : str or None, (default=None) + Filter the job list using a given user name. + Returns ------- out : list of string, @@ -62,7 +80,7 @@ def queued_or_running_jobs(): out = [] for queued_or_running in (_sge_queued_or_running_jobs, _slurm_queued_or_running_jobs): - out.extend(queued_or_running()) + out.extend(queued_or_running(user=user)) return out diff --git a/clusterlib/tests/test_scheduler.py b/clusterlib/tests/test_scheduler.py index 28a7226..88d536f 100644 --- a/clusterlib/tests/test_scheduler.py +++ b/clusterlib/tests/test_scheduler.py @@ -1,19 +1,78 @@ # Authors: Arnaud Joly # # License: BSD 3 clause +from __future__ import unicode_literals + +import os +import subprocess +import sys +from getpass import getuser + + from nose.tools import assert_equal from nose.tools import assert_raises +from nose.tools import assert_in +from nose import SkipTest from ..scheduler import queued_or_running_jobs from ..scheduler import submit -def test_smoke_test(): - # XXX : need a better way to test those functions - queued_or_running_jobs() +def test_queued_or_running_jobs_slurm(): + """Test queued or running job function on slurm""" + + with open(os.devnull, 'w') as shutup: + has_sbatch = 1 - subprocess.check_call(["which", "sbatch"], + stdout=shutup, stderr=shutup) + + if has_sbatch: + user = getuser() + job_name = "test-sleepy-job" + + # Launch a sleepy slurm job + sleep_job = submit(job_command="sleep 600", job_name=job_name, + backend="slurm", time="10:00", memory=100) + os.system(sleep_job) + + # Get job id + job_id = None + try: + out = subprocess.check_output( + "squeue --noheader -o '%j %i' -u {0} | grep {1}" + "".format(user, job_name), + shell=True) + job_id = out.split()[-1] + + except subprocess.CalledProcessError as error: + print(error.output) + raise + + # Assert that the job has been launched + try: + running_jobs = queued_or_running_jobs(user=user) + if sys.version_info[0] == 3: + # the bytes should be decoded before, right after you read them + # (e.g. from a socket or a file). In Python 2 is done + # implicitly with a random (platform specific) encoding. + # In Python 3 youhave to decode bytes objects into unicode + # string explicitly with an appropriate encoding depending on + # where the bytes come from. + + running_jobs = [s.decode("utf8") for s in running_jobs] + + assert_in(job_name, running_jobs) + finally: + # Make sure to clean up even if there is a failure + os.system("scancel %s" % job_id) + if os.path.exists("slurm-%s.out" % job_id): + os.remove("slurm-%s.out" % job_id) + + else: + raise SkipTest("sbatch is missing") def test_submit(): + """Test submit formatting function""" assert_equal( submit(job_command="python main.py", backend="sge"), 'echo \'#!/bin/bash\npython main.py\' | qsub -N "job" -l ' diff --git a/continuous_integration/configure_slurm.py b/continuous_integration/configure_slurm.py new file mode 100644 index 0000000..5da80cf --- /dev/null +++ b/continuous_integration/configure_slurm.py @@ -0,0 +1,96 @@ +# This file is taken from the Pulsar project +# (https://github.com/galaxyproject/pulsar) which is apache 2.0 +# + + +from socket import gethostname +from string import Template +from subprocess import call +from getpass import getuser + +SLURM_CONFIG_TEMPLATE = ''' +# slurm.conf file generated by configurator.html. +# Put this file on all nodes of your cluster. +# See the slurm.conf man page for more information. +# +ControlMachine=$hostname +#ControlAddr= +#BackupController= +#BackupAddr= +# +AuthType=auth/munge +CacheGroups=0 +#CheckpointType=checkpoint/none +CryptoType=crypto/munge +MpiDefault=none +#PluginDir= +#PlugStackConfig= +#PrivateData=jobs +ProctrackType=proctrack/pgid +#Prolog= +#PrologSlurmctld= +#PropagatePrioProcess=0 +#PropagateResourceLimits= +#PropagateResourceLimitsExcept= +ReturnToService=1 +#SallocDefaultCommand= +SlurmctldPidFile=/var/run/slurmctld.pid +SlurmctldPort=6817 +SlurmdPidFile=/var/run/slurmd.pid +SlurmdPort=6818 +SlurmdSpoolDir=/tmp/slurmd +SlurmUser=$user +#SlurmdUser=root +#SrunEpilog= +#SrunProlog= +StateSaveLocation=/tmp +SwitchType=switch/none +#TaskEpilog= +TaskPlugin=task/none +#TaskPluginParam= +#TaskProlog= +InactiveLimit=0 +KillWait=30 +MinJobAge=300 +#OverTimeLimit=0 +SlurmctldTimeout=120 +SlurmdTimeout=300 +#UnkillableStepTimeout=60 +#VSizeFactor=0 +Waittime=0 +FastSchedule=2 +SchedulerType=sched/backfill +SchedulerPort=7321 +SelectType=select/linear +#SelectTypeParameters= +AccountingStorageType=accounting_storage/none +#AccountingStorageUser= +AccountingStoreJobComment=YES +ClusterName=cluster +#DebugFlags= +#JobCompHost= +#JobCompLoc= +#JobCompPass= +#JobCompPort= +JobCompType=jobcomp/none +#JobCompUser= +JobAcctGatherFrequency=30 +JobAcctGatherType=jobacct_gather/none +SlurmctldDebug=3 +#SlurmctldLogFile= +SlurmdDebug=3 +#SlurmdLogFile= +NodeName=$hostname CPUs=1 State=UNKNOWN RealMemory=5000 +PartitionName=debug Nodes=$hostname Default=YES MaxTime=INFINITE State=UP +''' + + +def main(): + template_params = {"hostname": gethostname(), "user": getuser()} + config_contents = Template(SLURM_CONFIG_TEMPLATE).substitute(template_params) + open("/etc/slurm-llnl/slurm.conf", "w").write(config_contents) + call("slurmctld") + call("slurmd") + +if __name__ == "__main__": + main() diff --git a/doc/whats_new.rst b/doc/whats_new.rst index 66d89c9..bc6f28e 100644 --- a/doc/whats_new.rst +++ b/doc/whats_new.rst @@ -8,7 +8,13 @@ dev Changelog --------- + - Add travis workers with SLURM. By `Arnaud Joly`_ + - Fix possible deadlock occurence due to the use ``subprocess.PIPE``. + By `Arnaud Joly`_ + + - Add the possibility to specify a user in + :func:`scheduler.queued_or_running_jobs`. By `Arnaud Joly`_ 0.1 ===