Skip to content

Commit

Permalink
Solves issue 702
Browse files Browse the repository at this point in the history
In order to solve issue 702 we have to check whether a work-horse
terminated unexpectedly (by inspecting the exit code of the work-horse
process). If it exited unexpectedly we check if the job has either been
marked as finished, failed or other valid states. If it's not in any
valid state we mark it as failed and move it to the failed queue.
Since the process was terminated unexpectedly (think OOM) we do not
have any exception context and we can't run any custom exception handlers.

There is still a chance that the job will finish successfully but the
work-horse process will be killed before the job is marked as finished
and we will erroneously mark it as failed. The users should take care
to write idempotent jobs.
  • Loading branch information
Yannis Spiliopoulos committed Jun 30, 2016
1 parent 93d286a commit f9d5897
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,25 @@ def monitor_work_horse(self, job):
while True:
try:
_, ret_val = os.waitpid(self._horse_pid, 0)
if not (ret_val == os.EX_OK):
job_status = job.get_status()
if job_status is None:
# Job completed and its ttl has expired
break
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
with self.connection._pipeline() as pipeline:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry = StartedJobRegistry(job.origin, self.connection)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
try:
pipeline.execute()
except Exception:
pass
self.move_to_failed_queue_unhandled(
job,
"Work-horse proccess was terminated unexpectedly"
)
break
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
Expand Down Expand Up @@ -690,6 +709,11 @@ def move_to_failed_queue(self, job, *exc_info):
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
self.failed_queue.quarantine(job, exc_info=exc_string)

def move_to_failed_queue_unhandled(self, job, message):
"""Unhandled failure default handler: move the job to the failed queue."""
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
self.failed_queue.quarantine(job, exc_info=message)

def push_exc_handler(self, handler_func):
"""Pushes an exception handler onto the exc handler stack."""
self._exc_handlers.append(handler_func)
Expand Down

0 comments on commit f9d5897

Please sign in to comment.