Skip to content

Commit

Permalink
Preserve job ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
wallace committed May 24, 2012
1 parent 3d11dd3 commit d6842ef
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
12 changes: 10 additions & 2 deletions lib/resque-lonely_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ def unlock_queue(*args)
Resque.redis.del(redis_key(*args))
end

# Unfortunately, there's not a Resque interface for lpush so we have to
# role our own. This is based on Resque.push but we don't need to
# call Resque.watch_queue as the queue should already exist if we're
# unable to get the lock.
def lpush(*args)
Resque.redis.lpush("queue:#{Resque.queue_from_class(self)}", Resque.encode(class: self, args: args))
end

def before_perform(*args)
unless can_lock_queue?(*args)
# can't get the lock, so place self at the end of the queue
Resque.enqueue(self, *args)
# can't get the lock, so place at the front of the queue
lpush(*args)

# and don't perform
raise Resque::Job::DontPerform
Expand Down
26 changes: 14 additions & 12 deletions spec/lib/lonely_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,19 @@ def self.perform(account_id, *args); end
-> { job.perform }.should raise_error(Exception)
end

it 'should place self at the end of the queue if unable to acquire the lock' do
Resque.size(:serial_work).should == 0

job = Resque::Job.new(:serial_work, { 'class' => 'SerialJob', 'args' => %w[account_one job_one] })
it 'should place self at the beginning of the queue if unable to acquire the lock' do
job1 = Resque::Job.create(:serial_work, 'SerialJob', %w[account_one job_one])
job2 = Resque::Job.create(:serial_work, 'SerialJob', %w[account_one job_two])

SerialJob.should_receive(:can_lock_queue?).and_return(false)

# perform returns false when DontPerform exception is raised in
# before_perform callback
job.perform.should be_false
job1 = Resque.reserve(:serial_work)
job1.perform.should be_false

Resque.size(:serial_work).should == 1
first_queue_element = Resque.reserve(:serial_work)
first_queue_element.should == job1
end
end

Expand Down Expand Up @@ -110,18 +111,19 @@ def self.perform(account_id, *args); end
-> { job.perform }.should raise_error(Exception)
end

it 'should place self at the end of the queue if unable to acquire the lock' do
Resque.size(:serial_work).should == 0

job = Resque::Job.new(:serial_work, { 'class' => 'SerialJobWithCustomRedisKey', 'args' => %w[account_one job_one] })
it 'should place self at the beginning of the queue if unable to acquire the lock' do
job1 = Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', %w[account_one job_one])
job2 = Resque::Job.create(:serial_work, 'SerialJobWithCustomRedisKey', %w[account_one job_two])

SerialJobWithCustomRedisKey.should_receive(:can_lock_queue?).and_return(false)

# perform returns false when DontPerform exception is raised in
# before_perform callback
job.perform.should be_false
job1 = Resque.reserve(:serial_work)
job1.perform.should be_false

Resque.size(:serial_work).should == 1
first_queue_element = Resque.reserve(:serial_work)
first_queue_element.should == job1
end
end
end
Expand Down

2 comments on commit d6842ef

@letsbreelhere
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty awesome! And kinda weird that Resque itself doesn't provide this functionality :p

@wallace
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, yup, I agree.

Please sign in to comment.