diff --git a/conf/celeryconfig.py b/conf/celeryconfig.py index ef286057..5f1d59ef 100644 --- a/conf/celeryconfig.py +++ b/conf/celeryconfig.py @@ -11,7 +11,7 @@ CELERY_IMPORTS = ("chronam.core.tasks",) -CELERYD_LOG_FILE = os.path.join("/logs/celery", "celery.log") +CELERYD_LOG_FILE = os.path.join("/var/log/celery", "celery.log") CELERYD_LOG_LEVEL = logging.INFO CELERYD_CONCURRENCY = 2 @@ -29,12 +29,17 @@ if "chronam.loc_cts" in INSTALLED_APPS: CELERY_IMPORTS += ("chronam.loc_cts.tasks",) - CELERYBEAT_SCHEDULE["poll_cts"] = ( - {"task": "chronam.loc_cts.tasks.poll_cts", "schedule": datetime.timedelta(hours=4), "args": ()}, - ) - CELERYBEAT_SCHEDULE["poll_purge"] = ( - {"task": "chronam.loc_cts.tasks.poll_purge", "schedule": crontab(hour=3, minute=0)}, - ) + CELERYBEAT_SCHEDULE["poll_cts"] = { + "task": "chronam.loc_cts.tasks.poll_cts", + "schedule": datetime.timedelta(hours=1), + "args": (), + } + + CELERYBEAT_SCHEDULE["poll_purge"] = { + "task": "chronam.loc_cts.tasks.poll_purge", + "schedule": crontab(hour=3, minute=0), + "args": (), + } CELERYBEAT_LOG_FILE = os.path.join("/var/log/celery", "celerybeat.log") diff --git a/loc_cts/cts.py b/loc_cts/cts.py index 85992556..245daa5a 100644 --- a/loc_cts/cts.py +++ b/loc_cts/cts.py @@ -1,15 +1,14 @@ import json import logging import urllib -import urlparse import requests +import urlparse logger = logging.getLogger(__name__) class CTS(object): - def __init__(self, username, password, base_url, verify_ssl=True): self.auth = (username, password) self.base_url = base_url @@ -18,104 +17,93 @@ def __init__(self, username, password, base_url, verify_ssl=True): self.verify_ssl = verify_ssl def get_project(self, project_id): - """get a Project using its project_id e.g. "ndnp" - """ + """get a Project using its project_id e.g. "ndnp" """ url = "project/%s" % project_id instance_data = self._request(url) return Project(instance_data, self) def next_service_request(self, queue_name, service_type): - """get the next ServiceRequest for a given queue and service type - """ - q = {'queue': queue_name, 'serviceType': service_type} + """get the next ServiceRequest for a given queue and service type""" + q = {"queue": queue_name, "serviceType": service_type} d = self._request("service_request/serve_next", "put", data=q) if not d: return None return ServiceRequest(d, self) def get_service_requests(self, queue_name, service_type): - """get a list of ServiceRequests for a given queue and service type - """ - q = {'queue': queue_name, 'serviceType': service_type} + """get a list of ServiceRequests for a given queue and service type""" + q = {"queue": queue_name, "serviceType": service_type} d = self._request("service_requests", params=q) - for request in d['results']: + for request in d["results"]: sr = ServiceRequest(request, self) sr.reload() yield sr def get_service_request(self, key): - """get a ServiceRequest using its key - """ + """get a ServiceRequest using its key""" d = self._request("service_request/%s" % key) return ServiceRequest(d, self) def get_bag_instance(self, key): - """get a BagInstance using its key - """ + """get a BagInstance using its key""" url = "inventory/bag_instance/%s" % key instance_data = self._request(url) return BagInstance(instance_data, self) - def _request(self, url, method='get', params={}, data=None): + def _request(self, url, method="get", params={}, data=None): headers = {"accept": "application/json"} url = urlparse.urljoin(self.base_url, url) if data is not None: data = urllib.urlencode(data) headers["content-type"] = "application/x-www-form-urlencoded" - r = requests.request(method, url, - params=params, - data=data, - headers=headers, - auth=self.auth, - verify=self.verify_ssl) + r = requests.request( + method, url, params=params, data=data, headers=headers, auth=self.auth, verify=self.verify_ssl + ) if r.status_code == 200: return json.loads(r.content) elif r.status_code == 301 or r.status_code == 302: - return self._request(r.headers['location'], method, params, data) + return self._request(r.headers["location"], method, params, data) elif r.status_code == 204: return None else: - logger.error("%s %s with %s resulted in %s", method, url, params, - r.status_code) + logger.error("%s %s with %s resulted in %s", method, url, params, r.status_code) return None class Resource(object): - def __init__(self, json_data, cts): self.data = json_data self.cts = cts def reload(self): - url = self.link('self') + url = self.link("self") self.data = self.cts._request(url) @property def url(self): - return urlparse.urljoin(self.cts.base_url, self.link('self')) + return urlparse.urljoin(self.cts.base_url, self.link("self")) def link(self, rel): "returns the first link data with a given rel value" - for link in self.data.get('links', []): - if link.get('rel', None) == rel: - return link['href'] + for link in self.data.get("links", []): + if link.get("rel", None) == rel: + return link["href"] return None class ServiceRequest(Resource): - def requeue(self): - return self.cts._request(self.url, 'put', data={"action": "requeue"}) + return self.cts._request(self.url, "put", data={"action": "requeue"}) def complete(self): q = {"action": "respond", "result": "COMPLETED"} - return self.cts._request(self.url, 'put', data=q) + return self.cts._request(self.url, "put", data=q) def fail(self, message): q = {"action": "respond", "result": "FAILURE", "errorMessage": message} - return self.cts._request(self.url, 'put', data=q) + return self.cts._request(self.url, "put", data=q) class BagInstance(Resource): @@ -123,16 +111,14 @@ class BagInstance(Resource): class Bag(Resource): - def get_bag_instances(self): - instances = self.cts._request(self.link('bag_instances')) + instances = self.cts._request(self.link("bag_instances")) for instance_data in instances: yield BagInstance(instance_data, self.cts) class Project(Resource): - def get_bags(self): - bags = self.cts._request(self.link('bags')) + bags = self.cts._request(self.link("bags")) for bag_data in bags: yield Bag(bag_data, self.cts) diff --git a/loc_cts/tasks.py b/loc_cts/tasks.py index d12a1cd4..5570927a 100644 --- a/loc_cts/tasks.py +++ b/loc_cts/tasks.py @@ -5,10 +5,11 @@ from celery.decorators import task from django.conf import settings -from chronam.core import cts from chronam.core.models import Batch from chronam.core.tasks import load_batch, purge_batch +from . import cts + logger = logging.getLogger(__name__) @@ -24,17 +25,17 @@ def poll_purge(): logger.info("no purge service requests") break - logger.info('got purge service request: %s', req.url) - bag_instance_key = req.data['requestParameters']['baginstancekey'] + logger.info("got purge service request: %s", req.url) + bag_instance_key = req.data["requestParameters"]["baginstancekey"] bag_instance = cts.get_bag_instance(bag_instance_key) - batch_name = os.path.basename(bag_instance.data['filepath']) - logger.info('purging %s', batch_name) + batch_name = os.path.basename(bag_instance.data["filepath"]) + logger.info("purging %s", batch_name) # if the batch isn't there no need to purge try: if Batch.objects.filter(name=batch_name).count() == 0: - logger.info('no need to purge %s ; it is not loaded', batch_name) - logger.info('batch %s purged', batch_name) + logger.info("no need to purge %s ; it is not loaded", batch_name) + logger.info("batch %s purged", batch_name) else: purge_batch(batch_name, req) except Exception as e: @@ -60,9 +61,9 @@ def poll_cts(): # determine the location of the bag on the filesystem logger.info("got service request: %s", sr) - bag_instance_id = sr.data['requestParameters']['baginstancekey'] + bag_instance_id = sr.data["requestParameters"]["baginstancekey"] bag = c.get_bag_instance(bag_instance_id) - bag_dir = bag.data['filepath'] + bag_dir = bag.data["filepath"] try: logger.info("loading %s", bag_dir)