Skip to content

Commit

Permalink
TST slurm related function on travis + ENH allow setting a user with …
Browse files Browse the repository at this point in the history
…queued_or_running_jobs + FIX possible deadlock
  • Loading branch information
arjoly committed Jan 23, 2015
1 parent e4c19d6 commit 57d708b
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 16 deletions.
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ python:
- "3.4"
# command to install dependencies
install:
- "python setup.py install"
# Install package and its dependency for full test
- "pip install coverage coveralls"
- "pip install sphinx_bootstrap_theme"
# Install slurm
- "sudo apt-get install slurm-llnl "
- "sudo /usr/sbin/create-munge-key"
- "sudo service munge start"
- "sudo python continuous_integration/configure_slurm.py"

# command to run tests
script: make
after_success: coveralls
cache: apt
42 changes: 30 additions & 12 deletions clusterlib/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,52 @@
# Authors: Arnaud Joly
#
# License: BSD 3 clause
from __future__ import unicode_literals

import os
import subprocess
from xml.etree import ElementTree


__all__ = [
"queued_or_running_jobs",
"submit"
]


def _sge_queued_or_running_jobs():
def _sge_queued_or_running_jobs(user=None):
"""Get queued or running jobs from SGE queue system"""
command = "qstat -xml"
if user is not None:
command += " -u {}".format(user)

try:
xml = subprocess.check_output("qstat -xml", shell=True,
stderr=subprocess.PIPE)
tree = ElementTree.fromstring(xml)
return [leaf.text for leaf in tree.iter("JB_name")]
with open(os.devnull, 'w') as shutup:
xml = subprocess.check_output(command, shell=True, stderr=shutup)
tree = ElementTree.fromstring(xml)
return [leaf.text for leaf in tree.iter("JB_name")]
except subprocess.CalledProcessError:
# qstat is not available
return []


def _slurm_queued_or_running_jobs():
def _slurm_queued_or_running_jobs(user=None):
"""Get queued or running jobs from SLURM queue system"""
command = "squeue --noheader -o %j"
if user is not None:
command += " -u {}".format(user)

try:
out = subprocess.check_output("squeue --noheader -o %j", shell=True,
stderr=subprocess.PIPE)
out = out.split("\n")[:-1]
return out
with open(os.devnull, 'w') as shutup:
out = subprocess.check_output(command, shell=True, stderr=shutup)
out = out.splitlines()
return out
except subprocess.CalledProcessError:
# squeue is not available
return []


def queued_or_running_jobs():
def queued_or_running_jobs(user=None):
"""Return the names of the queued or running jobs under SGE and SLURM
The list of jobs could be either the list of all jobs on the scheduler
Expand All @@ -52,6 +65,11 @@ def queued_or_running_jobs():
Try ``qstat`` in SGE or ``squeue`` in SLURM to know which behavior it
follows.
Parameters
----------
user : str or None, (default=None)
Filter the job list using a given user name.
Returns
-------
out : list of string,
Expand All @@ -62,7 +80,7 @@ def queued_or_running_jobs():
out = []
for queued_or_running in (_sge_queued_or_running_jobs,
_slurm_queued_or_running_jobs):
out.extend(queued_or_running())
out.extend(queued_or_running(user=user))

return out

Expand Down
65 changes: 62 additions & 3 deletions clusterlib/tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,78 @@
# Authors: Arnaud Joly
#
# License: BSD 3 clause
from __future__ import unicode_literals

import os
import subprocess
import sys
from getpass import getuser


from nose.tools import assert_equal
from nose.tools import assert_raises
from nose.tools import assert_in
from nose import SkipTest

from ..scheduler import queued_or_running_jobs
from ..scheduler import submit


def test_smoke_test():
# XXX : need a better way to test those functions
queued_or_running_jobs()
def test_queued_or_running_jobs_slurm():
"""Test queued or running job function on slurm"""

with open(os.devnull, 'w') as shutup:
has_sbatch = 1 - subprocess.check_call(["which", "sbatch"],
stdout=shutup, stderr=shutup)

if has_sbatch:
user = getuser()
job_name = "test-sleepy-job"

# Launch a sleepy slurm job
sleep_job = submit(job_command="sleep 600", job_name=job_name,
backend="slurm", time="10:00", memory=100)
os.system(sleep_job)

# Get job id
job_id = None
try:
out = subprocess.check_output(
"squeue --noheader -o '%j %i' -u {0} | grep {1}"
"".format(user, job_name),
shell=True)
job_id = out.split()[-1]

except subprocess.CalledProcessError as error:
print(error.output)
raise

# Assert that the job has been launched
try:
running_jobs = queued_or_running_jobs(user=user)
if sys.version_info[0] == 3:
# the bytes should be decoded before, right after you read them
# (e.g. from a socket or a file). In Python 2 is done
# implicitly with a random (platform specific) encoding.
# In Python 3 youhave to decode bytes objects into unicode
# string explicitly with an appropriate encoding depending on
# where the bytes come from.

running_jobs = [s.decode("utf8") for s in running_jobs]

assert_in(job_name, running_jobs)
finally:
# Make sure to clean up even if there is a failure
os.system("scancel %s" % job_id)
if os.path.exists("slurm-%s.out" % job_id):
os.remove("slurm-%s.out" % job_id)

else:
raise SkipTest("sbatch is missing")


def test_submit():
"""Test submit formatting function"""
assert_equal(
submit(job_command="python main.py", backend="sge"),
'echo \'#!/bin/bash\npython main.py\' | qsub -N "job" -l '
Expand Down
96 changes: 96 additions & 0 deletions continuous_integration/configure_slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# This file is taken from the Pulsar project
# (https://github.com/galaxyproject/pulsar) which is apache 2.0
#


from socket import gethostname
from string import Template
from subprocess import call
from getpass import getuser

SLURM_CONFIG_TEMPLATE = '''
# slurm.conf file generated by configurator.html.
# Put this file on all nodes of your cluster.
# See the slurm.conf man page for more information.
#
ControlMachine=$hostname
#ControlAddr=
#BackupController=
#BackupAddr=
#
AuthType=auth/munge
CacheGroups=0
#CheckpointType=checkpoint/none
CryptoType=crypto/munge
MpiDefault=none
#PluginDir=
#PlugStackConfig=
#PrivateData=jobs
ProctrackType=proctrack/pgid
#Prolog=
#PrologSlurmctld=
#PropagatePrioProcess=0
#PropagateResourceLimits=
#PropagateResourceLimitsExcept=
ReturnToService=1
#SallocDefaultCommand=
SlurmctldPidFile=/var/run/slurmctld.pid
SlurmctldPort=6817
SlurmdPidFile=/var/run/slurmd.pid
SlurmdPort=6818
SlurmdSpoolDir=/tmp/slurmd
SlurmUser=$user
#SlurmdUser=root
#SrunEpilog=
#SrunProlog=
StateSaveLocation=/tmp
SwitchType=switch/none
#TaskEpilog=
TaskPlugin=task/none
#TaskPluginParam=
#TaskProlog=
InactiveLimit=0
KillWait=30
MinJobAge=300
#OverTimeLimit=0
SlurmctldTimeout=120
SlurmdTimeout=300
#UnkillableStepTimeout=60
#VSizeFactor=0
Waittime=0
FastSchedule=2
SchedulerType=sched/backfill
SchedulerPort=7321
SelectType=select/linear
#SelectTypeParameters=
AccountingStorageType=accounting_storage/none
#AccountingStorageUser=
AccountingStoreJobComment=YES
ClusterName=cluster
#DebugFlags=
#JobCompHost=
#JobCompLoc=
#JobCompPass=
#JobCompPort=
JobCompType=jobcomp/none
#JobCompUser=
JobAcctGatherFrequency=30
JobAcctGatherType=jobacct_gather/none
SlurmctldDebug=3
#SlurmctldLogFile=
SlurmdDebug=3
#SlurmdLogFile=
NodeName=$hostname CPUs=1 State=UNKNOWN RealMemory=5000
PartitionName=debug Nodes=$hostname Default=YES MaxTime=INFINITE State=UP
'''


def main():
template_params = {"hostname": gethostname(), "user": getuser()}
config_contents = Template(SLURM_CONFIG_TEMPLATE).substitute(template_params)
open("/etc/slurm-llnl/slurm.conf", "w").write(config_contents)
call("slurmctld")
call("slurmd")

if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions doc/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ dev

Changelog
---------
- Add travis workers with SLURM. By `Arnaud Joly`_

- Fix possible deadlock occurence due to the use ``subprocess.PIPE``.
By `Arnaud Joly`_

- Add the possibility to specify a user in
:func:`scheduler.queued_or_running_jobs`. By `Arnaud Joly`_

0.1
===
Expand Down

0 comments on commit 57d708b

Please sign in to comment.