Skip to content

Commit

Permalink
Fix script invalidation to use checksums (wooey#286)
Browse files Browse the repository at this point in the history
* Fix script invalidation to use checksums

* Test getting latest script

* Fix cleanup_dead_job import
  • Loading branch information
Chris7 authored Jun 20, 2019
1 parent 6919685 commit 3543bad
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 30 deletions.
50 changes: 36 additions & 14 deletions wooey/tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import absolute_import
import subprocess
import tarfile
import os
import zipfile
import six
import subprocess
import sys
import tarfile
import tempfile
import traceback

import zipfile
from threading import Thread

import six
from django.utils.text import get_valid_filename
from django.core.files import File
from django.conf import settings
Expand All @@ -18,6 +18,7 @@
from celery.schedules import crontab
from celery.signals import worker_process_init

from .backend import utils
from . import settings as wooey_settings

try:
Expand Down Expand Up @@ -74,11 +75,38 @@ class WooeyTask(Task):
# job.save()


def get_latest_script(script_version):
"""Downloads the latest script version to the local storage.
:param script_version: :py:class:`~wooey.models.core.ScriptVersion`
:return: boolean
Returns true if a new version was downloaded.
"""
script_path = script_version.script_path
local_storage = utils.get_storage(local=True)
script_exists = local_storage.exists(script_path.name)
if not script_exists:
local_storage.save(script_path.name, script_path.file)
return True
else:
# If script exists, make sure the version is valid, otherwise fetch a new one
script_contents = local_storage.open(script_path.name).read()
script_checksum = utils.get_checksum(buff=script_contents)
if script_checksum != script_version.checksum:
tf = tempfile.TemporaryFile()
with tf:
tf.write(script_contents)
tf.seek(0)
local_storage.delete(script_path.name)
local_storage.save(script_path.name, tf)
return True
return False


@celery_app.task(base=WooeyTask)
def submit_script(**kwargs):
job_id = kwargs.pop('wooey_job')
resubmit = kwargs.pop('wooey_resubmit', False)
from .backend import utils
from .models import WooeyJob, UserFile
job = WooeyJob.objects.get(pk=job_id)

Expand All @@ -98,14 +126,8 @@ def submit_script(**kwargs):
utils.mkdirs(abscwd)
# make sure we have the script, otherwise download it. This can happen if we have an ephemeral file system or are
# executing jobs on a worker node.
script_path = job.script_version.script_path
script_update_time = job.script_version.modified_date
local_storage = utils.get_storage(local=True)
script_exists = local_storage.exists(script_path.name)
if not script_exists or (local_storage.get_modified_time(script_path.name) < script_update_time):
if script_exists:
local_storage.delete(script_path.name)
local_storage.save(script_path.name, script_path.file)
get_latest_script(job.script_version)


job.status = WooeyJob.RUNNING
job.save()
Expand Down
6 changes: 3 additions & 3 deletions wooey/tests/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def tearDown(self):
print('unable to delete {}'.format(path))
super(FileCleanupMixin, self).tearDown()


class ScriptFactoryMixin(object):
class ScriptTearDown(object):
def tearDown(self):
for i in ScriptVersion.objects.all():
name = i.script_path.name
Expand All @@ -51,8 +50,9 @@ def tearDown(self):
utils.get_storage().delete(name)
except WindowsError:
print('unable to delete {}'.format(name))
super(ScriptFactoryMixin, self).tearDown()
super(ScriptTearDown, self).tearDown()

class ScriptFactoryMixin(ScriptTearDown, object):
def setUp(self):
self.translate_script = factories.generate_script(os.path.join(config.WOOEY_TEST_SCRIPTS, 'translate.py'))
self.choice_script = factories.generate_script(os.path.join(config.WOOEY_TEST_SCRIPTS, 'choices.py'))
Expand Down
70 changes: 57 additions & 13 deletions wooey/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import mock
import os
from datetime import timedelta

from django.test import TestCase

from . import mixins, factories
from .. import (
models,
settings as wooey_settings,
tasks,
from wooey import settings as wooey_settings
from wooey.backend.utils import add_wooey_script
from wooey.models import (
WooeyJob,
)
from wooey.tasks import (
cleanup_dead_jobs,
get_latest_script,
)

from . import config, mixins, factories

class TaskTests(mixins.ScriptFactoryMixin, TestCase):

Expand Down Expand Up @@ -44,29 +49,68 @@ def test_job_cleanup(self):
cleanup_wooey_jobs()
self.assertListEqual(list(WooeyJob.objects.all()), [])


class TestGetLatestScript(mixins.FileMixin, mixins.ScriptTearDown, TestCase):
def setUp(self):
super(TestGetLatestScript, self).setUp()
script = os.path.join(config.WOOEY_TEST_SCRIPTS, 'versioned_script', 'v1.py')
with open(script) as o:
v1 = self.storage.save(self.filename_func('v1.py'), o)
res = add_wooey_script(script_path=v1, script_name='test_versions')
self.first_version = self.rename_script(res['script'])

def rename_script(self, script_version):
# Because we are on local storage, the script uploaded will already be present, so
# we rename it to mimic it being absent on a worker node
new_name = self.storage.save(script_version.script_path.name, script_version.script_path.file)
script_version.script_path.name = new_name
script_version.save()
return script_version

def test_get_latest_script_loads_initial(self):
self.assertTrue(get_latest_script(self.first_version))

def test_get_latest_script_doesnt_redownload_same_script(self):
self.assertTrue(get_latest_script(self.first_version))
self.assertFalse(get_latest_script(self.first_version))

def test_get_latest_script_downloads_new_script(self):
get_latest_script(self.first_version)

# Update the script version
script = os.path.join(config.WOOEY_TEST_SCRIPTS, 'versioned_script', 'v2.py')
with open(script) as o:
v2 = self.storage.save(self.filename_func('v2.py'), o)

res = add_wooey_script(script_path=v2, script_name='test_versions')
second_version = self.rename_script(res['script'])

self.assertTrue(get_latest_script(second_version))


class TestCleanupDeadJobs(mixins.ScriptFactoryMixin, TestCase):
def test_handles_unresponsive_workers(self):
# Ensure that if we cannot connect to celery, we do nothing.
with mock.patch('wooey.tasks.celery_app.control.inspect') as inspect_mock:
running_job = factories.generate_job(self.translate_script)
running_job.status = models.WooeyJob.RUNNING
running_job.status = WooeyJob.RUNNING
running_job.save()

inspect_mock.return_value = mock.Mock(
active=mock.Mock(
return_value=None,
)
)
tasks.cleanup_dead_jobs()
self.assertEqual(models.WooeyJob.objects.get(pk=running_job.id).status, models.WooeyJob.RUNNING)
cleanup_dead_jobs()
self.assertEqual(WooeyJob.objects.get(pk=running_job.id).status, WooeyJob.RUNNING)

def test_cleans_up_dead_jobs(self):
# Make a job that is running but not active, and a job that is running and active.
dead_job = factories.generate_job(self.translate_script)
dead_job.status = models.WooeyJob.RUNNING
dead_job.status = WooeyJob.RUNNING
dead_job.save()
active_job = factories.generate_job(self.translate_script)
active_job.status = models.WooeyJob.RUNNING
active_job.status = WooeyJob.RUNNING
active_job.celery_id = 'celery-id'
active_job.save()
with mock.patch('wooey.tasks.celery_app.control.inspect') as inspect_mock:
Expand All @@ -81,8 +125,8 @@ def test_cleans_up_dead_jobs(self):
},
)
)
tasks.cleanup_dead_jobs()
cleanup_dead_jobs()

# Assert the dead job is updated
self.assertEqual(models.WooeyJob.objects.get(pk=dead_job.id).status, models.WooeyJob.FAILED)
self.assertEqual(models.WooeyJob.objects.get(pk=active_job.id).status, models.WooeyJob.RUNNING)
self.assertEqual(WooeyJob.objects.get(pk=dead_job.id).status, WooeyJob.FAILED)
self.assertEqual(WooeyJob.objects.get(pk=active_job.id).status, WooeyJob.RUNNING)

0 comments on commit 3543bad

Please sign in to comment.