Skip to content

Commit

Permalink
better processing of pending messages in queue mode
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Jul 11, 2012
1 parent 9646be9 commit 0c32ea0
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions lib/logstash/outputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ def register
)
end

if @batch
@flush_thread = Thread.new do
while sleep(@batch_timeout) do
process_pending(true)
end
end
end

@redis = nil
@pending_mutex = Mutex.new
end # def register

private
Expand Down Expand Up @@ -137,14 +146,18 @@ def receive(event)

private
def process_pending(force=false)
if !@pending_mutex.try_lock # failed to get lock
return
end

pending_count = 0
@pending.each { |k, v| pending_count += v.length }
time_since_last_flush = Time.now.to_f - @last_pending_flush

if force ||
if (force && pending_count > 0) ||
(pending_count >= @batch_events) ||
(time_since_last_flush >= @batch_timeout)
@logger.debug("Flushing redis output",
(time_since_last_flush >= @batch_timeout && pending_count > 0)
@logger.warn("Flushing redis output",
:pending_count => pending_count,
:time_since_last_flush => time_since_last_flush,
:batch_events => @batch_events,
Expand All @@ -158,13 +171,16 @@ def process_pending(force=false)
end
@last_pending_flush = Time.now.to_f
rescue => e
@pending_mutex.unlock
@logger.warn("Failed to send backlog of events to redis",
:pending => pending,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
raise e
end
end

@pending_mutex.unlock
end

public
Expand Down

0 comments on commit 0c32ea0

Please sign in to comment.