Skip to content

Commit

Permalink
Merge branch 'master' into jbarnette/master
Browse files Browse the repository at this point in the history
Conflicts:

	tasks/jobs.rake
  • Loading branch information
Tobias Lütke committed Nov 1, 2008
2 parents 450908d + 249c5a9 commit b9ebbb4
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 279 deletions.
8 changes: 4 additions & 4 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.timestamps
table.datetime :run_at
table.datetime :locked_at
table.string :locked_by
table.timestamps
end

h2. Usage
Expand Down
171 changes: 94 additions & 77 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@
module Delayed

class DeserializationError < StandardError
end
end

class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
set_table_name :delayed_jobs

cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"


NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
cattr_accessor :worker_name, :min_priority, :max_priority
self.worker_name = "pid:#{Process.pid}"
self.min_priority = nil
self.max_priority = nil

NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

class LockError < StandardError
end
end

def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end

def payload_object
@payload_object ||= deserialize(self['handler'])
end
Expand All @@ -31,43 +32,59 @@ def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

def reschedule(message, time = nil)


def reschedule(message, time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message
self.unlock
self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
end
end



def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
Job.create(:payload_object => object, :priority => priority)
end

Job.create(:payload_object => object, :priority => priority.to_i)
end

def self.find_available(limit = 5)
time_now = db_time_now
ActiveRecord::Base.silence do
find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => limit)

time_now = db_time_now

sql = NextTaskSQL.dup
conditions = [time_now, time_now, worker_name]

if self.min_priority
sql << ' AND (`priority` >= ?)'
conditions << min_priority
end
end


if self.max_priority
sql << ' AND (`priority` <= ?)'
conditions << max_priority
end

conditions.unshift(sql)

ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
end


# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
Expand All @@ -92,30 +109,30 @@ def self.reserve(max_run_time = 4.hours)
job.reschedule e.message
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
return job
return job
end
end

nil
end
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
now = self.class.db_time_now

affected_rows = if locked_by != worker
affected_rows = if locked_by != worker

# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql

else
# We alrady own this job, this may happen if the job queue crashes.
else

# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
Expand All @@ -124,95 +141,95 @@ def lock_exclusively!(max_run_time, worker = worker_name)
end_sql

end
unless affected_rows == 1

unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
end
end

self.locked_at = now
self.locked_by = worker
end
self.locked_by = worker
end

def unlock
self.locked_at = nil
self.locked_by = nil
end

def self.work_off(num = 100)
success, failure = 0, 0

num.times do

job = self.reserve do |j|
begin
j.perform
j.perform
success += 1
rescue
rescue
failure += 1
raise
end
end

break if job.nil?
end
end

return [success, failure]
end

private
def deserialize(source)

def deserialize(source)
attempt_to_load_file = true
begin
handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform)

begin
handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform)

if handler.nil?
if source =~ ParseObjectFromYaml

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
return handler if handler.respond_to?(:perform)
end
end

if handler.is_a?(YAML::Object)

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)

# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'

rescue TypeError, LoadError, NameError => e
raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."

raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
end

def attempt_to_load(klass)
klass.constantize
klass.constantize
end

def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
protected

protected

def before_save
self.run_at ||= self.class.db_time_now
end
end

end
end
4 changes: 2 additions & 2 deletions lib/delayed/message_sending.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Delayed
module MessageSending
def send_later(method, *args)
def send_later(method, *args)
Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
end
end
end
end
33 changes: 20 additions & 13 deletions lib/delayed/performable_method.rb
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
module Delayed
class PerformableMethod < Struct.new(:object, :method, :args)
AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/

class PerformableMethod < Struct.new(:object, :method, :args)
CLASS_STRING_FORMAT = /^CLASS\:([A-Z]\w+)$/
AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/

def initialize(object, method, args)
raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)

self.object = dump(object)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end

def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
true
end
true
end

private

def load(arg)
case arg
when AR_STRING_FORMAT then $1.constantize.find($2)
when CLASS_STRING_FORMAT then $1.constantize
when AR_STRING_FORMAT then $1.constantize.find($2)
else arg
end
end

def dump(arg)
case arg
when Class then class_to_string(arg)
when ActiveRecord::Base then ar_to_string(arg)
else arg
end
end
end

def ar_to_string(obj)
"AR:#{obj.class}:#{obj.id}"
end
end

def class_to_string(obj)
"CLASS:#{obj.name}"
end
end
end
Loading

0 comments on commit b9ebbb4

Please sign in to comment.