Skip to content

Commit

Permalink
Add module system (#202)
Browse files Browse the repository at this point in the history
* Rename restful-tango to restful_tango as modules cannot have dashes

* Change bad import * practice

* Make root level a module

* Fix server.py relative imports

* tango.py better formatting

* Resolve tango.py and jobManager.py circular imports

* jobManager.py reorder imports

* Reorder imports in server.py and tangoREST.py

* Change restful-tango instances to restful_tango
  • Loading branch information
fanpu authored Jan 13, 2021
1 parent 6b27f36 commit 855b9e9
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 41 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ A brief overview of the Tango respository:
* `worker.py` - Shepherds a job through its execution
* `preallocator.py` - Manages pools of VMs
* `vmms/` - VMMS library implementations
* `restful-tango/` - HTTP server layer on the main Tango
* `restful_tango/` - HTTP server layer on the main Tango

Tango was developed as a distributed grading system for [Autolab](https://github.com/autolab/Autolab) at Carnegie Mellon University and has been extensively used for autograding programming assignments in CMU courses.

Expand Down
Empty file added __init__.py
Empty file.
2 changes: 1 addition & 1 deletion deployment/config/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10

[program:tango]
command=/bin/bash -c 'sleep 5 && python3 /opt/TangoService/Tango/restful-tango/server.py 3000'
command=/bin/bash -c 'sleep 5 && python3 /opt/TangoService/Tango/restful_tango/server.py 3000'
autostart=true
redirect_stderr=true
stdout_logfile=/opt/TangoService/tango_log.log
Expand Down
30 changes: 19 additions & 11 deletions jobManager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import print_function

#
# JobManager - Thread that assigns jobs to worker threads
#
Expand All @@ -10,20 +11,26 @@
# is launched that will handle things from here on. If anything goes
# wrong, the job is made dead with the error.
#

import copy
import time
import logging
import threading

from builtins import str
from builtins import object
from datetime import datetime
from future import standard_library
standard_library.install_aliases()
from builtins import str
import threading, logging, time, copy

from datetime import datetime
from tango import *
from jobQueue import JobQueue
from preallocator import Preallocator
import tango # Written this way to avoid circular imports
from config import Config
from tangoObjects import TangoQueue
from worker import Worker
from preallocator import Preallocator
from jobQueue import JobQueue

standard_library.install_aliases()

from tangoObjects import TangoQueue
from config import Config

class JobManager(object):

Expand Down Expand Up @@ -97,7 +104,8 @@ def __manage(self):
self.log.info("Dispatched job %s:%d to %s [try %d]" %
(job.name, job.id, preVM.name, job.retries))
else:
self.log.info("Unable to pre-allocate a vm for job job %s:%d [try %d]" % (job.name, job.id, job.retries))
self.log.info(
"Unable to pre-allocate a vm for job job %s:%d [try %d]" % (job.name, job.id, job.retries))

job.appendTrace(
"%s|Dispatched job %s:%d [try %d]" %
Expand All @@ -121,7 +129,7 @@ def __manage(self):
print("You need to have Redis running to be able to initiate stand-alone\
JobManager")
else:
tango = TangoServer()
tango = tango.TangoServer()
tango.log.debug("Resetting Tango VMs")
tango.resetTango(tango.preallocator.vmms)
for key in tango.preallocator.machines.keys():
Expand Down
Empty file added restful_tango/__init__.py
Empty file.
4 changes: 2 additions & 2 deletions restful-tango/server.py → restful_tango/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from concurrent.futures import ThreadPoolExecutor
from functools import partial, wraps

import tangoREST
from tangoREST import TangoREST
from config import Config

tangoREST = tangoREST.TangoREST()
tangoREST = TangoREST()
EXECUTOR = ThreadPoolExecutor(max_workers=4)

# Regex for the resources
Expand Down
5 changes: 3 additions & 2 deletions restful-tango/tangoREST.py → restful_tango/tangoREST.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
# interface of Tango.
#

from builtins import object
from builtins import str
import sys
import os
import inspect
import hashlib
import json
import logging

from builtins import object
from builtins import str

currentdir = os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
Expand Down
58 changes: 34 additions & 24 deletions tango.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# 1. The Restful API: This is the interface for Tango that receives
# requests from clients via HTTP. AddJob requests are converted
# into a form that the tangoServer understands and then passed on
# to an instance of the tangoServer class. (restful-tango/*)
# to an instance of the tangoServer class. (restful_tango/*)
#
# 2. The TangoServer Class: This is a class that accepts addJob requests
# from the restful server. Job requests are validated and placed in
Expand Down Expand Up @@ -34,15 +34,20 @@
# the pool, the preallocator creates another instance and adds it
# to the pool. (preallocator.py)

import threading
import logging
import time
import stat
import re
import os

from builtins import object
from builtins import str
import threading, logging, time, stat, re, os

from datetime import datetime

from jobManager import JobManager
from preallocator import Preallocator
from jobQueue import JobQueue
from jobManager import JobManager

from tangoObjects import TangoJob
from config import Config

Expand All @@ -54,7 +59,7 @@ class TangoServer(object):

def __init__(self):
self.daemon = True

vmms = None
if Config.VMMS_NAME == "tashiSSH":
from vmms.tashiSSH import TashiSSH
Expand All @@ -76,7 +81,7 @@ def __init__(self):
# memory between processes. Otherwise, JobManager will
# be initiated separately
JobManager(self.jobQueue).start()

logging.basicConfig(
filename=Config.LOGFILE,
format="%(levelname)s|%(asctime)s|%(name)s|%(message)s",
Expand Down Expand Up @@ -194,7 +199,7 @@ def getInfo(self):
""" getInfo - return various statistics about the Tango daemon
"""
stats = {}
stats['elapsed_secs'] = time.time() - self.start_time;
stats['elapsed_secs'] = time.time() - self.start_time
stats['job_requests'] = Config.job_requests
stats['job_retries'] = Config.job_retries
stats['waitvm_timeouts'] = Config.waitvm_timeouts
Expand All @@ -203,7 +208,7 @@ def getInfo(self):
stats['runjob_errors'] = Config.runjob_errors
stats['copyout_errors'] = Config.copyout_errors
stats['num_threads'] = threading.activeCount()

return stats

#
Expand All @@ -223,7 +228,8 @@ def resetTango(self, vmms):
for vmms_name in vmms:
vobj = vmms[vmms_name]
vms = vobj.getVMs()
self.log.debug("Pre-existing VMs: %s" % [vm.name for vm in vms])
self.log.debug("Pre-existing VMs: %s" %
[vm.name for vm in vms])
namelist = []
for vm in vms:
if re.match("%s-" % Config.PREFIX, vm.name):
Expand All @@ -233,7 +239,7 @@ def resetTango(self, vmms):
namelist.append(vm.name)
if namelist:
self.log.warning("Killed these %s VMs on restart: %s" %
(vmms_name, namelist))
(vmms_name, namelist))

for _, job in self.jobQueue.liveJobs.items():
if not job.isNotAssigned():
Expand All @@ -242,10 +248,9 @@ def resetTango(self, vmms):
(str(job.name), str(job.assigned)))
except Exception as err:
self.log.error("resetTango: Call to VMMS %s failed: %s" %
(vmms_name, err))
(vmms_name, err))
os._exit(1)


def __validateJob(self, job, vmms):
""" validateJob - validate the input arguments in an addJob request.
"""
Expand Down Expand Up @@ -279,7 +284,7 @@ def __validateJob(self, job, vmms):
imgList = vobj.getImages()
if job.vm.image not in imgList:
self.log.error("validateJob: Image not found: %s" %
job.vm.image)
job.vm.image)
job.appendTrace("%s|validateJob: Image not found: %s" %
(datetime.utcnow().ctime(), job.vm.image))
errors += 1
Expand All @@ -294,27 +299,30 @@ def __validateJob(self, job, vmms):
errors += 1
else:
if job.vm.vmms not in vmms:
self.log.error("validateJob: Invalid vmms name: %s" % job.vm.vmms)
self.log.error(
"validateJob: Invalid vmms name: %s" % job.vm.vmms)
job.appendTrace("%s|validateJob: Invalid vmms name: %s" %
(datetime.utcnow().ctime(), job.vm.vmms))
errors += 1

# Check the output file
if not job.outputFile:
self.log.error("validateJob: Missing job.outputFile")
job.appendTrace("%s|validateJob: Missing job.outputFile" % (datetime.utcnow().ctime()))
job.appendTrace("%s|validateJob: Missing job.outputFile" %
(datetime.utcnow().ctime()))
errors += 1
else:
if not os.path.exists(os.path.dirname(job.outputFile)):
self.log.error("validateJob: Bad output path: %s", job.outputFile)
self.log.error(
"validateJob: Bad output path: %s", job.outputFile)
job.appendTrace("%s|validateJob: Bad output path: %s" %
(datetime.utcnow().ctime(), job.outputFile))
errors += 1

# Check for max output file size parameter
if not job.maxOutputFileSize:
self.log.debug("validateJob: Setting job.maxOutputFileSize "
"to default value: %d bytes", Config.MAX_OUTPUT_FILE_SIZE)
"to default value: %d bytes", Config.MAX_OUTPUT_FILE_SIZE)
job.maxOutputFileSize = Config.MAX_OUTPUT_FILE_SIZE

# Check the list of input files
Expand All @@ -323,11 +331,12 @@ def __validateJob(self, job, vmms):
if not inputFile.localFile:
self.log.error("validateJob: Missing inputFile.localFile")
job.appendTrace("%s|validateJob: Missing inputFile.localFile" %
(datetime.utcnow().ctime()))
(datetime.utcnow().ctime()))
errors += 1
else:
if not os.path.exists(os.path.dirname(job.outputFile)):
self.log.error("validateJob: Bad output path: %s", job.outputFile)
self.log.error(
"validateJob: Bad output path: %s", job.outputFile)
job.appendTrace("%s|validateJob: Bad output path: %s" %
(datetime.utcnow().ctime(), job.outputFile))
errors += 1
Expand All @@ -338,20 +347,21 @@ def __validateJob(self, job, vmms):
# Check if input files include a Makefile
if not hasMakefile:
self.log.error("validateJob: Missing Makefile in input files.")
job.appendTrace("%s|validateJob: Missing Makefile in input files." % (datetime.utcnow().ctime()))
errors+=1
job.appendTrace("%s|validateJob: Missing Makefile in input files." % (
datetime.utcnow().ctime()))
errors += 1

# Check if job timeout has been set; If not set timeout to default
if not job.timeout or job.timeout <= 0:
self.log.debug("validateJob: Setting job.timeout to"
" default config value: %d secs", Config.RUNJOB_TIMEOUT)
" default config value: %d secs", Config.RUNJOB_TIMEOUT)
job.timeout = Config.RUNJOB_TIMEOUT

# Any problems, return an error status
if errors > 0:
self.log.error("validateJob: Job rejected: %d errors" % errors)
job.appendTrace("%s|validateJob: Job rejected: %d errors" %
(datetime.utcnow().ctime(), errors))
(datetime.utcnow().ctime(), errors))
return -1
else:
return 0

0 comments on commit 855b9e9

Please sign in to comment.