Skip to content

Commit

Permalink
Reverts to placing jobs at the end of the queue
Browse files Browse the repository at this point in the history
 - It is impossible to preserve job ordering due to interleaving of
   dequeueing/enqueueing with multiple workers by lpushing jobs back onto the
   resque queue.

   Assume N workers.  Assume N+1 jobs for mutex A.  Until job1 is complete
   workers will grab jobs and place them back on the front of the queue in an
   indeterminate order.

   Therefore, we default to re-enqueuing jobs at the end of the queue to avoid
   temporary starvation where jobs associated with mutex A can block jobs for
   all other mutexes.

 - Rename lpush to reenqueue so that users of resque-lonely_job can overwrite
   job re-enqueueing behavior as needed.
  • Loading branch information
wallace committed Jun 13, 2012
1 parent 5ca0034 commit e9912fb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
8 changes: 4 additions & 4 deletions lib/resque-lonely_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def unlock_queue(*args)
# role our own. This is based on Resque.push but we don't need to
# call Resque.watch_queue as the queue should already exist if we're
# unable to get the lock.
def lpush(*args)
Resque.redis.lpush("queue:#{Resque.queue_from_class(self)}", Resque.encode(class: self, args: args))
def reenqueue(*args)
Resque.enqueue(self, *args)
end

def before_perform(*args)
unless can_lock_queue?(*args)
# can't get the lock, so place at the front of the queue
lpush(*args)
# can't get the lock, so re-enqueue the task
reenqueue(*args)

# and don't perform
raise Resque::Job::DontPerform
Expand Down
20 changes: 12 additions & 8 deletions spec/lib/lonely_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def self.perform(account_id, *args); end
-> { job.perform }.should raise_error(Exception)
end

it 'should place self at the beginning of the queue if unable to acquire the lock' do
job1 = Resque::Job.create(:serial_work, 'SerialJob', %w[account_one job_one])
job2 = Resque::Job.create(:serial_work, 'SerialJob', %w[account_one job_two])
it 'should place self at the end of the queue if unable to acquire the lock' do
job1_payload = %w[account_one job_one]
job2_payload = %w[account_one job_two]
Resque::Job.create(:serial_work, 'SerialJob', job1_payload)
Resque::Job.create(:serial_work, 'SerialJob', job2_payload)

SerialJob.should_receive(:can_lock_queue?).and_return(false)

Expand All @@ -77,7 +79,7 @@ def self.perform(account_id, *args); end
job1.perform.should be_false

first_queue_element = Resque.reserve(:serial_work)
first_queue_element.should == job1
first_queue_element.payload["args"].should == [job2_payload]
end
end

Expand Down Expand Up @@ -111,9 +113,11 @@ def self.perform(account_id, *args); end
-> { job.perform }.should raise_error(Exception)
end

it 'should place self at the beginning of the queue if unable to acquire the lock' do
job1 = Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', %w[account_one job_one])
job2 = Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', %w[account_one job_two])
it 'should place self at the end of the queue if unable to acquire the lock' do
job1_payload = %w[account_one job_one]
job2_payload = %w[account_one job_two]
Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', job1_payload)
Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', job2_payload)

SerialJobWithCustomRedisKey.should_receive(:can_lock_queue?).and_return(false)

Expand All @@ -123,7 +127,7 @@ def self.perform(account_id, *args); end
job1.perform.should be_false

first_queue_element = Resque.reserve(:serial_work)
first_queue_element.should == job1
first_queue_element.payload["args"].should == [job2_payload]
end
end
end
Expand Down

0 comments on commit e9912fb

Please sign in to comment.