Skip to content

Commit

Permalink
Merge pull request rq#715 from samuelcolvin/heroku-worker
Browse files Browse the repository at this point in the history
adding heroku worker
  • Loading branch information
selwin authored Jun 21, 2016
2 parents 0c5fe62 + 7efd036 commit 3b12d10
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 16 deletions.
6 changes: 6 additions & 0 deletions rq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ def __init__(self, message, raw_data, inner_exception=None):

class DequeueTimeout(Exception):
pass


class ShutDownImminentException(Exception):
def __init__(self, msg, extra_info):
self.extra_info = extra_info
super(ShutDownImminentException, self).__init__(msg)
68 changes: 58 additions & 10 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from .connections import get_current_connection, push_connection, pop_connection
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .exceptions import DequeueTimeout
from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue, get_failed_queue
Expand Down Expand Up @@ -371,8 +371,7 @@ def request_stop(self, signum, frame):
signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)

msg = 'Warm shut down requested'
self.log.warning(msg)
self.handle_warm_shutdown_request()

# If shutdown is requested in the middle of a job, wait until
# finish before shutting down and save the request in redis
Expand All @@ -384,6 +383,9 @@ def request_stop(self, signum, frame):
else:
raise StopRequested()

def handle_warm_shutdown_request(self):
self.log.warning('Warm shut down requested')

def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`"""

Expand Down Expand Up @@ -537,13 +539,7 @@ def main_work_horse(self, job, queue):
# that are different from the worker.
random.seed()

# Always ignore Ctrl+C in the work horse, as it might abort the
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.setup_work_horse_signals()

self._is_horse = True
self.log = logger
Expand All @@ -554,6 +550,16 @@ def main_work_horse(self, job, queue):
# constrast to the regular sys.exit()
os._exit(int(not success))

def setup_work_horse_signals(self):
"""Setup signal handing for the newly spawned work horse."""
# Always ignore Ctrl+C in the work horse, as it might abort the
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)

def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to
job execution.
Expand Down Expand Up @@ -714,3 +720,45 @@ def main_work_horse(self, *args, **kwargs):
def execute_job(self, *args, **kwargs):
"""Execute job in same thread/process, do not fork()"""
return self.perform_job(*args, **kwargs)


class HerokuWorker(Worker):
"""
Modified version of rq worker which:
* stops work horses getting killed with SIGTERM
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later
"""
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value',
'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']

def setup_work_horse_signals(self):
"""Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN"""
signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin)
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)

def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0:
self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal')
os.kill(self.horse_pid, signal.SIGRTMIN)
else:
self.log.warning('Warm shut down requested, no horse found')

def request_stop_sigrtmin(self, signum, frame):
if self.imminent_shutdown_delay == 0:
logger.warn('Imminent shutdown, raising ShutDownImminentException immediately')
self.request_force_stop_sigrtmin(signum, frame)
else:
logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds',
self.imminent_shutdown_delay)
signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay)

def request_force_stop_sigrtmin(self, signum, frame):
info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties)
logger.warn('raising ShutDownImminentException to cancel job...')
raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)
25 changes: 24 additions & 1 deletion tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import os
import time

from rq import Connection, get_current_job, get_current_connection
from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job
from rq.compat import PY2
from rq.worker import HerokuWorker


def say_pid():
Expand Down Expand Up @@ -102,3 +103,25 @@ def black_hole(job, *exc_info):
def long_running_job(timeout=10):
time.sleep(timeout)
return 'Done sleeping...'


def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay):
"""
Run the work horse for a simplified heroku worker where perform_job just
creates two sentinel files 2 seconds apart.
:param sandbox: directory to create files in
:param _imminent_shutdown_delay: delay to use for HerokuWorker
"""
class TestHerokuWorker(HerokuWorker):
imminent_shutdown_delay = _imminent_shutdown_delay

def perform_job(self, job, queue):
create_file(os.path.join(sandbox, 'started'))
# have to loop here rather than one sleep to avoid holding the GIL
# and preventing signals being received
for i in range(20):
time.sleep(0.1)
create_file(os.path.join(sandbox, 'finished'))

w = TestHerokuWorker(Queue('dummy'))
w.main_work_horse(None, None)
97 changes: 92 additions & 5 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
unicode_literals)

import os
import shutil
from datetime import timedelta
from time import sleep
import signal
Expand All @@ -13,7 +14,7 @@
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid,
access_self)
run_dummy_heroku_worker, access_self)
from tests.helpers import strip_microseconds

from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
Expand All @@ -23,6 +24,7 @@
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
from rq.worker import HerokuWorker


class CustomJob(Job):
Expand Down Expand Up @@ -473,6 +475,7 @@ def test_suspend_worker_execution(self):
assert q.count == 0
self.assertEqual(os.path.exists(SENTINEL_FILE), True)

@slow
def test_suspend_with_duration(self):
q = Queue()
for _ in range(5):
Expand Down Expand Up @@ -575,7 +578,7 @@ def kill_worker(pid, double_kill):
os.kill(pid, signal.SIGTERM)


class TestWorkerShutdown(RQTestCase):
class TimeoutTestCase:
def setUp(self):
# we want tests to fail if signal are ignored and the work remain
# running, so set a signal to kill them after X seconds
Expand All @@ -588,6 +591,8 @@ def _timeout(self, signal, frame):
"test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout
)


class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
@slow
def test_idle_worker_warm_shutdown(self):
"""worker with no ongoing job receiving single SIGTERM signal and shutting down"""
Expand Down Expand Up @@ -616,12 +621,12 @@ def test_working_worker_warm_shutdown(self):
w.work()

p.join(2)
self.assertFalse(p.is_alive())
self.assertTrue(w._stop_requested)
self.assertTrue(os.path.exists(sentinel_file))

shutdown_requested_date = w.shutdown_requested_date
self.assertIsNotNone(shutdown_requested_date)
self.assertEqual(type(shutdown_requested_date).__name__, 'datetime')
self.assertIsNotNone(w.shutdown_requested_date)
self.assertEqual(type(w.shutdown_requested_date).__name__, 'datetime')

@slow
def test_working_worker_cold_shutdown(self):
Expand Down Expand Up @@ -675,3 +680,85 @@ def test_run_scheduled_access_self(self):
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
assert get_failed_queue().count == 0
assert q.count == 0


class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
def setUp(self):
super(HerokuWorkerShutdownTestCase, self).setUp()
self.sandbox = '/tmp/rq_shutdown/'
os.makedirs(self.sandbox)

def tearDown(self):
shutil.rmtree(self.sandbox, ignore_errors=True)

@slow
def test_immediate_shutdown(self):
"""Heroku work horse shutdown with immediate (0 second) kill"""
p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 0))
p.start()
time.sleep(0.5)

os.kill(p.pid, signal.SIGRTMIN)

p.join(2)
self.assertEqual(p.exitcode, 1)
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))

@slow
def test_1_sec_shutdown(self):
"""Heroku work horse shutdown with 1 second kill"""
p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 1))
p.start()
time.sleep(0.5)

os.kill(p.pid, signal.SIGRTMIN)
time.sleep(0.1)
self.assertEqual(p.exitcode, None)
p.join(2)
self.assertEqual(p.exitcode, 1)

self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))

@slow
def test_shutdown_double_sigrtmin(self):
"""Heroku work horse shutdown with long delay but SIGRTMIN sent twice"""
p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 10))
p.start()
time.sleep(0.5)

os.kill(p.pid, signal.SIGRTMIN)
# we have to wait a short while otherwise the second signal wont bet processed.
time.sleep(0.1)
os.kill(p.pid, signal.SIGRTMIN)
p.join(2)
self.assertEqual(p.exitcode, 1)

self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))

def test_handle_shutdown_request(self):
"""Mutate HerokuWorker so _horse_pid refers to an artificial process
and test handle_warm_shutdown_request"""
w = HerokuWorker('foo')

path = os.path.join(self.sandbox, 'shouldnt_exist')
p = Process(target=create_file_after_timeout, args=(path, 2))
p.start()
self.assertEqual(p.exitcode, None)

w._horse_pid = p.pid
w.handle_warm_shutdown_request()
p.join(2)
self.assertEqual(p.exitcode, -34)
self.assertFalse(os.path.exists(path))

def test_handle_shutdown_request_no_horse(self):
"""Mutate HerokuWorker so _horse_pid refers to non existent process
and test handle_warm_shutdown_request"""
w = HerokuWorker('foo')

w._horse_pid = 19999
with self.assertRaises(OSError):
w.handle_warm_shutdown_request()

0 comments on commit 3b12d10

Please sign in to comment.